Working with multiple partition formats within a Hive table with Spark

Problem statement and why is this interesting

Incoming data is usually in a format different than we would like for long-term storage. The first step that we usually do is transform the data into a format such as Parquet that can easily be queried by Hive/Impala.

The use case we imagined is when we are ingesting data in Avro format. The users want easy access to the data with Hive or Spark. To have performant queries we need the historical data to be in Parquet format. We don't want to have two different tables: one for the historical data in Parquet format and one for the incoming data in Avro format. Our preference goes out to having one table which can handle all data, no matter the format. This way we can run our conversion process (from Avro to Parquet) let's say every night, but the users would still get access to all data all the time.

In Hive you can achieve this with a partitioned table, where you can set the format of each partition. Spark unfortunately doesn't implement this. Since our users also use Spark, this was something we had to fix. This was also a nice challenge for a couple of GoDataDriven Friday's where we could then learn more about the internals of Apache Spark.

Setting up a test environment

First we had to identify what we need to be able to reproduce the problem. We needed the following components:

  • Hive with persistent Hive metastore
  • Hadoop to be able to store and access the files
  • Spark

We're using MacBook Pro's and we had to do the following steps:

Install Hadoop, Hive, Spark and create a local HDFS directory

$ brew install hadoop
$ brew install hive
$ brew install apache-spark
$ mkdir ${HOME}/localhdfs

Run the Hive Metastore in Docker

We want the Hive Metastore to use PostgreSQL to be able to access it from Hive and Spark simultaneously. We found a docker image, but this wasn't the latest version, so we forked it and upgraded it to the latest version. You can find this docker image on GitHub (source code is at link). To run this image, use (note that we exposed port 5432 so we can use this for Hive):

$ docker pull krisgeus/docker-hive-metastore-postgresql:upgrade-2.3.0
$ docker run -p 5432:5432 krisgeus/docker-hive-metastore-postgresql:upgrade-2.3.0

Configuring Hive to use the Hive Metastore

  • Download postgresql-42.2.4.jar from this link
  • Add this jar to Hive lib directory (in our case the Hive version was 2.3.1)
$ cp postgresql-42.2.4.jar /usr/local/Cellar/hive/<version>/libexec/lib. 
  • Create a working directory
$ mkdir ${HOME}/spark-hive-schema
$ cd ${HOME}/spark-hive-schema
  • Create a configuration directory and copy hadoop and hive base configurations
$ mkdir hadoop_conf
$ cp -R /usr/local/Cellar/hadoop/3.0.0/libexec/etc/hadoop/* ${HOME}/spark-hive-schema/hadoop_conf
$ cp -R /usr/local/Cellar/hive/2.3.1/libexec/conf/* ${HOME}/spark-hive-schema/hadoop_conf
$ cp conf/hive-default.xml.template ${HOME}/spark-hive-schema/hadoop_conf/hive-site.xml
  • Change configurations in hive-site.xml so we actually use the Hive Metastore we just started
<property>
    <name>system:java.io.tmpdir</name>
    <value>/tmp/hive/java</value>
</property>
<property>
    <name>system:user.name</name>
    <value>${user.name}</value>
</property>
<property>
    <name>hive.metastore.warehouse.dir</name>
    <value>${user.home}/localhdfs/user/hive/warehouse</value>
    <description>location of default database for the warehouse</description>
</property>
<property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>hive</value>
    <description>Username to use against metastore database</description>
</property>
<property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:postgresql://localhost:5432/metastore</value>
    <description>
        JDBC connect string for a JDBC metastore.
        To use SSL to encrypt/authenticate the connection, provide database-specific SSL flag in the connection URL.
        For example, jdbc:postgresql://myhost/db?ssl=true for postgres database.
    </description>
</property>
<property>
    <name>datanucleus.connectionPoolingType</name>
    <value>NONE</value>
    <description>
        Expects one of [bonecp, dbcp, hikaricp, none].
        Specify connection pool library for datanucleus
    </description>
</property>
<property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>org.postgresql.Driver</value>
    <description>Driver class name for a JDBC metastore</description>
</property>
<property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>hive</value>
    <description>Username to use against metastore database</description>
</property>
  • Make /tmp/hive writable:
$ chmod 777 /tmp/hive
  • In a terminal set paths so we can start HiveServer2, where hadoop_version=3.0.0, hive_version=2.3.1
$ export HADOOP_HOME=/usr/local/Cellar/hadoop/<hadoop_version>/libexec
$ export HIVE_HOME=/usr/local/Cellar/hive/<hive_version>/libexec
$ export HADOOP_CONF_DIR=${HOME}/spark-hive-schema/hadoop_conf
$ export HIVE_CONF_DIR=${HOME}/spark-hive-schema/hadoop_conf
$ hiveserver2
  • In another terminal set the same paths and start beeline, where hadoop_version=3.0.0, hive_version=2.3.1
$ export HADOOP_HOME=/usr/local/Cellar/hadoop/<hadoop_version>/libexec
$ export HIVE_HOME=/usr/local/Cellar/hive/<hive_version>/libexec
$ export HADOOP_CONF_DIR=${HOME}/spark-hive-schema/hadoop_conf
$ export HIVE_CONF_DIR=${HOME}/spark-hive-schema/hadoop_conf
$ beeline -u jdbc:hive2://localhost:10000/default -n hive -p hive

We're all set up...we can now create a table.

Creating a working example in Hive

  • In beeline create a database and a table
CREATE DATABASE test;
USE test;

CREATE EXTERNAL TABLE IF NOT EXISTS events(eventType STRING, city STRING)
PARTITIONED BY(dt STRING)
STORED AS PARQUET;
  • Add two parquet partitions
ALTER TABLE events ADD PARTITION (dt='2018-01-25')
                       PARTITION (dt='2018-01-26');
ALTER TABLE events PARTITION (dt='2018-01-25') SET FILEFORMAT PARQUET;
ALTER TABLE events PARTITION (dt='2018-01-26') SET FILEFORMAT PARQUET;
  • Add a partition where we'll add Avro data
ALTER TABLE events ADD PARTITION (dt='2018-01-27');
ALTER TABLE events PARTITION (dt='2018-01-27') SET FILEFORMAT AVRO;
  • Check the table
SELECT * FROM events;
events.eventtype events.city events.dt
DESCRIBE FORMATTED events PARTITION (dt="2018-01-26");
Column name Data type
# Storage Information
SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
DESCRIBE FORMATTED events PARTITION (dt="2018-01-27");
Column name Data type
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.avro.AvroSerDe
InputFormat: org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat

Create some test data

  • Create test data directory
mkdir ${HOME}/spark-hive-schema/testdata
  • Generate Avro data and add to table
$ cat ${HOME}/spark-hive-schema/testdata/data.json
{ "eventtype": "avro", "city": "Breukelen" }
{ "eventtype": "avro", "city": "Wilnis" }
{ "eventtype": "avro", "city": "Abcoude" }
{ "eventtype": "avro", "city": "Vinkeveen" }

$ cat ${HOME}/spark-hive-schema/testdata/data.avsc
{
  "type" : "record",
  "name" : "events",
  "namespace" : "com.godatadriven.events",
  "fields" : [ {
    "name" : "eventtype",
    "type" : "string"
  }, {
    "name" : "city",
    "type" : "string"
  }]

$ brew install avro-tools
$ cd ${HOME}/spark-hive-schema/testdata
$ avro-tools fromjson --schema-file data.avsc data.json > \
  ${HOME}/localhdfs/user/hive/warehouse/test.db/events/dt\=2018-01-27/data.avro
  • Generate parquet data and add to table
$ cd ${HOME}/spark-hive-schema/testdata
$ spark-shell
> import org.apache.spark.sql.functions.lit
> spark.read.json("data.json").select(lit("parquet").alias("eventtype"), 
    col("city")).write.parquet("data.pq")
> :quit
$ cp ./data.pq/part*.parquet ${HOME}/localhdfs/user/hive/warehouse/test.db/events/dt\=2018-01-26/
  • Insert data into last existing partition using beeline
INSERT INTO TABLE events PARTITION (dt="2018-01-25")
SELECT 'overwrite', 'Amsterdam';
  • Check that we have data in beeline
SELECT * FROM events;
events.eventtype events.city events.dt
overwrite Amsterdam 2018-01-25
parquet Breukelen 2018-01-26
parquet Wilnis 2018-01-26
parquet Abcoude 2018-01-26
parquet Vinkeveen 2018-01-26
avro Breukelen 2018-01-27
avro Wilnis 2018-01-27
avro Abcoude 2018-01-27
avro Vinkeveen 2018-01-27

Yuhee we see all the data!!!

  • Double check that the formats are correct
$ tree ${HOME}/localhdfs/user/hive/warehouse/test.db/events
.../localhdfs/user/hive/warehouse/test.db/events
├── dt=2018-01-25
│   └── 000000_0
├── dt=2018-01-26
│   └── part-00000-1846ef38-ec33-47ae-aa80-3f72ddb50c7d-c000.snappy.parquet
└── dt=2018-01-27
    └── data.avro       

Creating a failing test in Spark

Connect to spark and make sure we access the Hive Metastore we set up:

$ export HADOOP_HOME=/usr/local/Cellar/hadoop/<hadoop_version>/libexec
$ export HIVE_HOME=/usr/local/Cellar/hive/<hive_version>/libexec
$ export HADOOP_CONF_DIR=${HOME}/spark-hive-schema/hadoop_conf
$ export HIVE_CONF_DIR=${HOME}/spark-hive-schema/hadoop_conf

$ spark-shell --driver-class-path /usr/local/Cellar/hive/2.3.1/libexec/lib/postgresql-42.2.4.jar \
--jars /usr/local/Cellar/hive/2.3.1/libexec/lib/postgresql-42.2.4.jar \
--conf spark.executor.extraClassPath=/usr/local/Cellar/hive/2.3.1/libexec/lib/postgresql-42.2.4.jar \
--conf spark.hadoop.javax.jdo.option.ConnectionURL=jdbc:postgresql://localhost:5432/metastore \
--conf spark.hadoop.javax.jdo.option.ConnectionUserName=hive \
--conf spark.hadoop.javax.jdo.option.ConnectionPassword=hive \
--conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.postgresql.Driver \
--conf spark.hadoop.hive.metastore.schema.verification=true \
--conf spark.hadoop.hive.metastore.schema.verification.record.version=true \
--conf spark.sql.hive.metastore.version=2.1.0 \
--conf spark.sql.hive.metastore.jars=maven
> spark.sql("select * from test.events").show()
...
java.lang.RuntimeException: 
    file:...localhdfs/user/hive/warehouse/test.db/events/dt=2018-01-27/data.avro 
    is not a Parquet file. expected magic number at tail [80, 65, 82, 49] 
    but found [-126, 61, 76, 121]
...

So it doesn't work. Let's see if we can check out the Apache Spark code base and create a failing unit test.

First we forked the Apache Spark project and checked it out and made sure we have sbt installed. We also figured out how to run a given unit test.

$ git clone https://github.com/krisgeus/spark.git
$ cd spark
$ ./build/sbt "hive/testOnly *HiveSQLViewSuite"

Writing a test

First we need to create a table and change the format of a given partition. The final test can be found at: MultiFormatTableSuite.scala We're implemented the following steps:

  • create a table with partitions
  • create a table based on Avro data which is actually located at a partition of the previously created table. Insert some data in this table.
  • create a table based on Parquet data which is actually located at another partition of the previously created table. Insert some data in this table.
  • try to read the data from the original table with partitions

Let's try to run the test:

$ ./build/sbt "hive/testOnly *MultiFormatTableSuite
...
- create hive table with multi format partitions *** FAILED *** (4 seconds, 265 milliseconds)
[info]   org.apache.spark.sql.catalyst.parser.ParseException: 
Operation not allowed: ALTER TABLE SET FILEFORMAT(line 2, pos 0)
[info] 
[info] == SQL ==
[info] 
[info] ALTER TABLE ext_multiformat_partition_table
[info] ^^^
[info] PARTITION (dt='2018-01-26') SET FILEFORMAT PARQUET
...

So Spark doesn't support changing the file format of a partition. Before we adventure into fixing this problem, let's understand how execution plans work in Spark.

Understanding execution plans

The best explanation that we found was on the Databricks site, the article about Deep Dive into Spark SQL’s Catalyst Optimizer Here is an excerpt in case you don't want to read the whole article:

At the core of Spark SQL is the Catalyst optimizer, which leverages advanced programming language features (e.g. Scala’s pattern matching and quasiquotes) in a novel way to build an extensible query optimizer.

We use Catalyst’s general tree transformation framework in four phases, as shown below: (1) analyzing a logical plan to resolve references, (2) logical plan optimization, (3) physical planning, and (4) code generation to compile parts of the query to Java bytecode. In the physical planning phase, Catalyst may generate multiple plans and compare them based on cost. All other phases are purely rule-based. Each phase uses different types of tree nodes; Catalyst includes libraries of nodes for expressions, data types, and logical and physical operators.

Spark palns

Spark SQL begins with a relation to be computed, either from an abstract syntax tree (AST) returned by a SQL parser, or from a DataFrame object constructed using the API. Spark SQL uses Catalyst rules and a Catalog object that tracks the tables in all data sources to resolve these attributes. It starts by building an “unresolved logical plan” tree with unbound attributes and data types, then applies rules that do the following:

Looking up relations by name from the catalog. Mapping named attributes, such as col, to the input provided given operator’s children. Determining which attributes refer to the same value to give them a unique ID (which later allows optimization of expressions such as col = col). Propagating and coercing types through expressions: for example, we cannot know the return type of 1 + col until we have resolved col and possibly casted its subexpressions to a compatible types.

The logical optimization phase applies standard rule-based optimizations to the logical plan. These include constant folding, predicate pushdown, projection pruning, null propagation, Boolean expression simplification, and other rules.

In the physical planning phase, Spark SQL takes a logical plan and generates one or more physical plans, using physical operators that match the Spark execution engine. It then selects a plan using a cost model. At the moment, cost-based optimization is only used to select join algorithms: for relations that are known to be small, Spark SQL uses a broadcast join, using a peer-to-peer broadcast facility available in Spark. The framework supports broader use of cost-based optimization, however, as costs can be estimated recursively for a whole tree using a rule. We thus intend to implement richer cost-based optimization in the future. The physical planner also performs rule-based physical optimizations, such as pipelining projections or filters into one Spark map operation. In addition, it can push operations from the logical plan into data sources that support predicate or projection pushdown. We will describe the API for these data sources in a later section.

The final phase of query optimization involves generating Java bytecode to run on each machine.

Support setting the format for a partition in a Hive table with Spark

First we had to discover that Spark uses ANTLR to generate its SQL parser. ANTLR ANother Tool for Language Recognition can generate a grammar that can be built and walked. The grammar for Spark is specified in SqlBase.g4

So we need to support FILEFORMAT in case a partition is set, thus we had to add the following line to SqlBase.g4.

| ALTER TABLE tableIdentifier (partitionSpec)?
  SET FILEFORMAT fileFormat

This will not only add support for setting the fileformat of a partition but also on a table itself. We don't need this for our current case, but might come in handy some other time.

The AstBuilder in Spark SQL, processes the ANTLR ParseTree to obtain a Logical Plan. Since we're working with Spark SQL, we had to modify SparkSqlParser which creates a SparkSqlAstBuilder which extends AstBuilder. In the SparkSqlAstBuilder we had to create a new function to be able to interpret the grammar and add the requested step to the logical plan.

/**
 * Create an [[AlterTableFormatPropertiesCommand]] command.
 *
 * For example:
 * {{{
 *   ALTER TABLE table [PARTITION spec] SET FILEFORMAT format;
 * }}}
 */
override def visitSetTableFormat(ctx: SetTableFormatContext): LogicalPlan = withOrigin(ctx) {
  val format = (ctx.fileFormat) match {
    // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format
    case (c: TableFileFormatContext) =>
      visitTableFileFormat(c)
    // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO
    case (c: GenericFileFormatContext) =>
      visitGenericFileFormat(c)
    case _ =>
      throw new ParseException("Expected STORED AS ", ctx)
  }
  AlterTableFormatCommand(
    visitTableIdentifier(ctx.tableIdentifier),
    format,
    // TODO a partition spec is allowed to have optional values. This is currently violated.
    Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
}

But we're still not done, because we also need a definition for the new commands. These definitions are specified in ddl.scala and the definitions are based on the ones described in the Apache Hive Language manual.

So what should this command do? Well it should make sure that the serde properties are set properly on the partition level.

/**
 * A command that sets the format of a table/view/partition .
 *
 * The syntax of this command is:
 * {{{
 *   ALTER TABLE table [PARTITION spec] SET FILEFORMAT format;
 * }}}
 */
case class AlterTableFormatCommand(
                                             tableName: TableIdentifier,
                                             format: CatalogStorageFormat,
                                             partSpec: Option[TablePartitionSpec])
  extends RunnableCommand {

  override def run(sparkSession: SparkSession): Seq[Row] = {
    val catalog = sparkSession.sessionState.catalog
    val table = catalog.getTableMetadata(tableName)
    DDLUtils.verifyAlterTableType(catalog, table, isView = false)
    // For datasource tables, disallow setting serde or specifying partition
    if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) {
      throw new AnalysisException("Operation not allowed: ALTER TABLE SET FILEFORMAT " +
        "for a specific partition is not supported " +
        "for tables created with the datasource API")
    }
    if (partSpec.isEmpty) {
      val newTable = table.withNewStorage(
        serde = format.serde.orElse(table.storage.serde),
        inputFormat = format.inputFormat.orElse(table.storage.inputFormat),
        outputFormat = format.outputFormat.orElse(table.storage.outputFormat),
        properties = table.storage.properties ++ format.properties)
      catalog.alterTable(newTable)
    } else {
      val spec = partSpec.get
      val part = catalog.getPartition(table.identifier, spec)
      val newPart = part.copy(storage = part.storage.copy(
        serde = format.serde.orElse(part.storage.serde),
        inputFormat = format.inputFormat.orElse(table.storage.inputFormat),
        outputFormat = format.outputFormat.orElse(table.storage.outputFormat),
        properties = part.storage.properties ++ format.properties))
      catalog.alterPartitions(table.identifier, Seq(newPart))
    }
    Seq.empty[Row]
  }

}

Now we have a unit test which succeeds in which we can set the file format for a partition.

Surprise: execution plan differences...

We were playing around and we accidentally changed the format of the partitioned table to Avro, so we had an Avro table with a Parquet partition in it...and IT WORKED!! We could read all the data...but wait, what?!!? So Avro table with Parquet partition works, but Parquet table with Avro partition doesn't?

What's the difference? Let's see the execution plans:

  • execution plan for the Parquet table with Avro partitions
[ {
  "class" : "org.apache.spark.sql.execution.ProjectExec",
  "num-children" : 1,
  "projectList" : [ [ {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    ...
    "qualifier" : "ext_parquet_partition_table"
  } ], [ {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    ...
    "qualifier" : "ext_parquet_partition_table"
  } ] ],
  "child" : 0
}, {
  "class" : "org.apache.spark.sql.execution.FileSourceScanExec",
  "num-children" : 0,
  "relation" : null,
  "output" : [ [ {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    ...
  } ], [ {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    ...
  } ], [ {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    ...
  } ] ],
  "requiredSchema" : {
    "type" : "struct",
    "fields" : [ {
      "name" : "key",
      "type" : "integer",
      "nullable" : true,
      "metadata" : { }
    }, {
      "name" : "value",
      "type" : "string",
      "nullable" : true,
      "metadata" : { }
    } ]
  },
  "partitionFilters" : [ ],
  "dataFilters" : [ ],
  "tableIdentifier" : {
    "product-class" : "org.apache.spark.sql.catalyst.TableIdentifier",
    "table" : "ext_parquet_partition_table",
    "database" : "default"
  }
} ]
  • execution plan for the Avro table with Parquet partitions
[ {
  "class" : "org.apache.spark.sql.hive.execution.HiveTableScanExec",
  "num-children" : 0,
  "requestedAttributes" : [ [ {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    ...
    "qualifier" : "ext_avro_partition_table"
  } ], [ {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    ...
    "qualifier" : "ext_avro_partition_table"
  } ] ],
  "relation" : [ {
    "class" : "org.apache.spark.sql.catalyst.catalog.HiveTableRelation",
    ...
    "tableMeta" : {
      "product-class" : "org.apache.spark.sql.catalyst.catalog.CatalogTable",
      "identifier" : {
        "product-class" : "org.apache.spark.sql.catalyst.TableIdentifier",
        "table" : "ext_avro_partition_table",
        "database" : "default"
      },
      "tableType" : {
        "product-class" : "org.apache.spark.sql.catalyst.catalog.CatalogTableType",
        "name" : "EXTERNAL"
      },
      "storage" : {
        "product-class" : "org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat",
        "locationUri" : null,
        "inputFormat" : "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat",
        "outputFormat" : "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat",
        "serde" : "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
        "compressed" : false,
        "properties" : null
      },
      "schema" : {
        "type" : "struct",
        "fields" : [ {
          "name" : "key",
          "type" : "integer",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "value",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "dt",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        } ]
      },
      "provider" : "hive",
      "partitionColumnNames" : "[dt]",
      "owner" : "secret",
      "createTime" : 1532699365000,
      "lastAccessTime" : 0,
      "createVersion" : "2.4.0-SNAPSHOT",
      "properties" : null,
      "stats" : null,
      "unsupportedFeatures" : [ ],
      "tracksPartitionsInCatalog" : true,
      "schemaPreservesCase" : true,
      "ignoredProperties" : null,
      "hasMultiFormatPartitions" : false
    },
    "dataCols" : [ [ {
      "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
      "num-children" : 0,
      "name" : "key",
      "dataType" : "integer",
      "nullable" : true,
      "metadata" : { },
      "exprId" : {
        "product-class" : "org.apache.spark.sql.catalyst.expressions.ExprId",
        "id" : 25,
        "jvmId" : "5988f5b1-0966-49ca-a6de-2485d5582464"
      }
    } ], [ {
      "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
      "num-children" : 0,
      "name" : "value",
      "dataType" : "string",
      "nullable" : true,
      "metadata" : { },
      "exprId" : {
        "product-class" : "org.apache.spark.sql.catalyst.expressions.ExprId",
        "id" : 26,
        "jvmId" : "5988f5b1-0966-49ca-a6de-2485d5582464"
      }
    } ] ],
    "partitionCols" : [ [ {
      "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
      "num-children" : 0,
      "name" : "dt",
      "dataType" : "string",
      "nullable" : true,
      "metadata" : { },
      "exprId" : {
        "product-class" : "org.apache.spark.sql.catalyst.expressions.ExprId",
        "id" : 27,
        "jvmId" : "5988f5b1-0966-49ca-a6de-2485d5582464"
      }
    } ] ]
  } ],
  "partitionPruningPred" : [ ],
  "sparkSession" : null
} ]

So how could we make the parquet table not take the FileSourceScanExec route, but the HiveTableScanExec route? And thus make the Parquet execution plan similar to the Avro execution plan?

Finding the magic setting

We went digging in the code again and we discovered the following method in HiveStrategies.scala

/**
 * Relation conversion from metastore relations to data source relations for better performance
 *
 * - When writing to non-partitioned Hive-serde Parquet/Orc tables
 * - When scanning Hive-serde Parquet/ORC tables
 *
 * This rule must be run before all other DDL post-hoc resolution rules, i.e.
 * `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
 */
case class RelationConversions(
    conf: SQLConf,
    sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
  private def isConvertible(relation: HiveTableRelation): Boolean = {
    val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
    serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
      serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
  }
...

Looking at this code we decided to set HiveUtils.CONVERT_METASTORE_PARQUET.key to false, meaning that we won't optimize to data source relations in case we altered the partition file format.

We simulated this by adding the following line to our unit test:

withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false")

With this setting, the test passed. We decided to implement an extra check to avoid optimising the execution when a partition has a different file format than the main table.

Implement multi format partitions support without disabling optimizations manually

We decided to add a property, hasMultiFormatPartitions to the CatalogTable which reflects if we have a table with multiple different formats in it's partitions. This had to be done in HiveClientImpl.scala

The following line did the trick:

hasMultiFormatPartitions = shim.getAllPartitions(client, h).map(_.getInputFormatClass).distinct.size > 1

Of course we also had to add this to the catalog's interface.scala and then we could use this in HiveStrategies.scala to change the previously mentioned method:

/**
 * Relation conversion from metastore relations to data source relations for better performance
 *
 * - When writing to non-partitioned Hive-serde Parquet/Orc tables
 * - When scanning Hive-serde Parquet/ORC tables
 *
 * This rule must be run before all other DDL post-hoc resolution rules, i.e.
 * `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
 */
case class RelationConversions(
    conf: SQLConf,
    sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
  private def isConvertible(relation: HiveTableRelation): Boolean = {
    val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
    val hasMultiFormatPartitions = relation.tableMeta.hasMultiFormatPartitions
    serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) &&
      (!hasMultiFormatPartitions) ||
      serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
  }
...

With these changes our tests also succeeded.

All this work has been provided back to the community in this Apache Spark pull request. Based on the last comments on out pull request it doesn't look very promissing that this will be merged. Still we learned a lot about Apache Spark and it's internals.

Stay up to date on the latest insights and best-practices by registering for the GoDataDriven newsletter.
Follow us for more of this