Hive on Spark: Join Design Master


Purpose and Prerequisites

The purpose of this document is to summarize the findings of all the research of different joins and describe a unified design to attack the problem in Spark.  It will identify the optimization processors will be involved and their responsibilities.

It is not the purpose to go in depth for design of the various join implementations in Spark, such as the common-join (HIVE-7384 ), or the optimized join variants like mapjoin (HIVE-7613 ), skew-join (HIVE-8406 ) or SMB mapjoin (HIVE-8202 ).  It will be helpful to refer to the design documents attached on JIRA for those details before reading this document, as they will also contain some background of how they are implemented in MapReduce and comparisons.  Lastly, it will also be helpful to read the overall Hive on Spark design doc before reading this document.

MapReduce Summary

This section summarizes plan-generation of different joins of Hive on MapReduce, which will serve as a model for Spark.  We aim to support most of these join optimizations.  Priority will be for the automatically-optimized joins, followed by those that need user input, such as hints and metadata.

Over the years, there have been lots of join optimization introduced to Hive beyond the common-join, via Processors (partial transformation of operator-tree or work tree).  The following diagram (Figure 1) shows the relationships of different Processors, each of which does a small part in transforming an operator-tree from common-join to one of the optimized join work-trees (mapjoin, bucket mapjoin, SMB mapjoin, or skewjoin).

Processors are represented by boxes in the following diagram:  They are split into three types:

Each processor box shows the triggering condition, either a Hive configuration property, or the presence of a certain operator in the tree.  So, you can see how to trigger a particular join by following its path through processors and making sure all configurations are triggered and the given operator has been created by previous processors.  There are further conditions to do the transform listed on the top, (ie, size of table, etc), that are not be explained by this document, and can be referred from documents of individual joins.

Figure 1. Join Processors for Hive on MapReduce

The input arrow at top before any Processor is always an operator-plan for common-join, which is shown as follows.  In other words, this is the original join plan if none of the optimizer processors are activated.

The ‘exit’ arrows of the join paths shown in Figure 1 are the various optimized join variants.  There are of course other exit paths, in which the plan remains unchanged as a common-join upon falling out of any processor’s pre-req checks, but they are not shown to simplify the diagram.


Description of the some important Processors follows.  Again, there’s no space to describe them all, so few key ones are chosen.

to

to

MapJoinFactory MapJoin processing:

to

MapJoinFactory SMB MapJoin processing:

to

 

to:

to

A brief summary of all the processor-paths of possible join plans shown in Figure 1:

  1.  Skewjoin (compile-time)

    1. SkewJoinOptimizer: From a common-join operator tree, creates two join operator-trees connected by union operator.  These will represent a join with skew key, and a join without it.

    2. One or both reduce-side join might be converted to mapjoin by CommonJoinResolver, see auto-mapjoin for more details.

  2. Skewjoin (runtime)

    1. SkewJoinResolver:  Create conditional work after the original common-join work, which is a list of mapjoin works.  These will handle the skew keys.

    2. MapJoinResolver: Final preparation for mapjoin works as described.

  3. Auto-mapjoin

    1. CommonJoinResolver:  Convert common-join operator tree to mapjoin operator-tree, with big/small table(s) identified on the Mapjoin operator, as described.

    2. MapJoinResolver: Final preparation for mapjoin works as described.

  4. Map join query with hints

    1. MapJoinProcessor:  Convert common-join operator tree to mapjoin operator-tree, with big/small table(s) identified on the Mapjoin operator, as described.

    2. MapJoinFactory: Adds localWork pointing to small tables in mapjoin work, as described.

    3. MapJoinResolver:  Final preparation for mapjoin works as described.

  5. Bucket map join query with hints.

    1. MapJoinProcessor:  Convert common-join operator tree to mapjoin operator-tree, with big/small table(s) identified on the Mapjoin operator, as described.

    2. BucketMapJoinProcessor:  Add bucketing information to MapJoin op.

    3. MapJoinFactory: Adds localWork pointing to small tables in mapjoin work, as described.

    4. MapJoinResolver:  Final preparation for mapjoin works as described.

  6. SMB join query with hints

    1. MapJoinProcessor:  Convert common-join operator tree to mapjoin operator-tree, with big/small table(s) identified on the Mapjoin operator, as described.

    2. SortedBucketMapJoinProc:  Convert mapjoin operator-tree to SMBMapJoin operator-tree.  Add DummyOp to small-tables.

    3. MapJoinFactory:  Adds localWork pointing to small tables in SMBMapjoin work, as described.

    4. May be converted back to MapJoin (see #8 for details).

  7. Auto-SMB join

    1. SortedMergeBucketMapJoinProc: Convert mapjoin operator-tree to SMBMapJoin operator-tree.  Add DummyOp to small-tables.

    2. MapJoinFactory:  Adds localWork pointing to small tables in SMBMapjoin work, as described.

    3. May be converted to MapJoin (see #8 for details).

  8.  SMB join that converts to mapjoin

    1. SMBJoin operator-tree constructed as mentioned in #6, #7 above.

    2. SortedMergeJoinResolver:  For each possible big-table candidate, create a mapjoin work.  These will have LocalWork data structures to keep track of small-tables.  Create ConditionalWork with all of these mapjoin works (with the original SMBJoin work as the backup task of each one), and the original SMBJoin work as the last option.

    3. MapJoinResolver: For each mapjoin work created, final preparation as described.

Tez Comparison

Hive on Tez is still evolving.  They currently disable all logical-optimizer processors, and use a processor called “ConvertJoinMapJoin” located in the work-generation phase.  It utilitzes stats annotated on the operator-tree to make some decisions as to what join to take.  It will directly create plans for the following joins:

These look different than MapReduce plans, and are based on the Tez physical feature “broadcast-edge”.  See JIRA’s of those joins for more details.

Spark MapJoin

 

For most of the joins for Hive on Spark, the overall execution will be similar to MR for the first cut.  Thus, a similar work-tree as in MR will be generated, though encapsulated in SparkWork(s) instead of MapRedWork(s).

One difference is implementation of mapjoin, which is worth spending some time discussing.  Recall the mapjoin work-tree in MapReduce:

  1. Run the MapredLocalWork containing small-table(s)’ operator-tree, ending it with a HashTableSink op that dumps to file.  This is made into a distributed cache.

  2. Run the MapWork for the big table, which will populate small-table hashmap from the distributed cache file using HashTableDummy’s loader.

Spark mapjoin has a choice to take advantage of faster Spark functionality like broadcast-variable, or use something similar to distributed-cache.  A discussion for choosing MR-style distributed cache is given in “small-table broadcasting” document in HIVE-7613 , though broadcast-variable support might be added in future.  Here is the plan that we want.

 

  1. Run the small-table SparkWorks on Spark cluster, which dump to hashmap file (this is main difference with MR, as the small-table work is distributed).

  2. Run the SparkWork for the big table on Spark cluster.  Mappers will lookup the small-table hashmap from the file using HashTableDummy’s loader.

For bucket map-join, each bucket of each small table goes to a separate file, and each mapper of big-table loads the specific bucket-file(s) of corresponding buckets for each small table.

Spark Join Design

Let’s redraw the processor diagram for Hive on Spark.  There are several other points to note in this section:

There are also some minor differences (improvements) over original MapReduce, beyond the one mentioned in Spark MapJoin section.

Figure 2: Join Processors for Hive on Spark


Again, we first explore some of the interesting processors:

 

 

 

to

to  

to

 

to


to


 

to:

 

And the summary of each join plan’s processor path of Figure 2.

  1. Compile-time skewjoin without mapjoin: Logical optimizer completely re-used from MapReduce.

    1. SkewJoinOptimizer: This logical-optimizer processor is reused, to create two join plans out of one connected by union.

    2. Follows auto-conversion to MapJoin path.

  2. SMB MapJoin (with hints): again, logical optimizers are mostly similar to those in MapReduce.

    1. SparkMapJoinProcessor/BucketMapJoinOptimizer/SparkSMBJoinHintOptimizer: Almost identical to MapReduce versions, these transform the operator tree to include SMBMapJoinOp with big/small table(s) identified.

    2. GenSparkWork:  Generate the SparkWork, which is an SMBMapJoin operator-tree rooted at big-table TS.

    3. SparkSortMergeJoinFactory: Attach Localwork data structure pointing to small tables in the SMBMapJoin work as described.

  3. SMB MapJoin (without hints): again, logical optimizers are mostly similar to those in MapReduce

    1. SparkSortMergeJoinOptimizer: Almost identical to MapReduce version, this transforms the common-join operator tree to SMB mapjoin operator-tree, with big/small table(s) identified on SMBMapJoin operator, as described.

    2. GenSparkWork:  Generate the SparkWork, which is an SMBMapJoin operator-tree rooted at big-table TS.

    3. SparkSortMergeJoinFactory: Attach Localwork data structure pointing to small tables in the SMBMapJoin work as described.

  4. Auto-mapjoin:  Mostly a rewrite, unable to reuse the MapReduce processors.

    1. SparkMapJoinOptimizer:  Based on stats, converts a common-join operator tree to mapjoin operator-tree, with big/small table(s) identified in MapJoinOp, as described.

    2. GenSparkWork:  Generate the SparkWork, which has MapJoin operator-trees rooted at various table TS’s.

    3. SparkMapJoinResolver: Create two SparkWorks to achieve the mapjoin, as described.

  5. Mapjoin via hints: again, logical optimizers are mostly similar to those in MapReduce

    1. SparkMapJoinProcessor: Almost identical to MapReduce version, this transforms the common-join operator tree to mapjoin operator-tree, with big/small table(s) identified in MapJoinOp, as described.

    2. GenSparkWork:   Generate the SparkWork, which has MapJoin operator-trees rooted at various table TS’s.

    3. SparkMapJoinResolver: Create two SparkWorks to achieve the mapjoin, as described.

  6. SMB joins converted to mapjoin:

    1. This route is avoided unlike in MapReduce.  If conditions are met, join is directly sent to SparkMapJoinOptimizer and SparkMapJoinResolver, just like a normal auto-mapjoin.

  7. Skew join (runtime):

    1. SparkSkewJoinResolver:  Takes a SparkWork with common join, and turn it in a conditional work.  Then add additional SparkWork with mapjoin operator-tree as backups in the conditional work.  These will handle the skew keys.

    2. SparkMapJoinResolver:  For each backup SparkWork with mapjoin, create two SparkWorks to achieve the mapjoin, as described.

  8. Auto-bucket join

    1. SparkMapJoinOptimizer:  Extra logic here beyond auto-mapjoin conversion to support auto-bucket mapjoin conversion.

    2. SparkMapJoinResolver:  Extra logic here beyond auto-mapjoin conversion to support auto-bucket mapjoin conversion.

首页