Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Hive on Spark: Join Design Master 

Purpose

...

 

The purpose of this document is to summarize the findings of all the research of different joins and describe a unified design to attack the problem in Spark.  It will identify the optimization processors will be involved and their responsibilities.

...

It is not the purpose to go in depth for implementations of the various joins, such as the common-join (HIVE-7384), or the optimized join variants like mapjoin (HIVE-7613), skew-join (HIVE-8406) or SMB mapjoin (HIVE-8202).  It will be helpful to refer to the design documents attached on JIRA for those details before reading this document.

MapReduce Summary

...

 

This section summarizes plan-generation of different joins of Hive on MapReduce, which will serve as a model for Spark.  We aim to support most of these join optimizations.  Priority will be for the automatically-optimized joins, followed by those that need user input, such as hints and metadata.

...

Each processor box shows the triggering condition, either a Hive configuration property, or the presence of a certain operator in the tree.  So, you can see how to trigger a particular join by following its path through processors and making sure all configurations are triggered and the given operator has been created by previous processors.  There are further conditions to do the transform listed on the top, (ie, size of table, etc), that are not be explained by this document, and can be referred from documents of individual joins.



 

Figure 1. Join Processors for Hive on MapReduce

...

  • CommonJoinResolver:  This handles auto-conversions of joins to mapjoins and goes a separate path than hint-based mapjoin conversions.  This takes a working common-join work tree already generated from the common-join operator-tree, and creates an alternative work-tree.  The new work-tree consists of a mapwork rooted at the big table.  Pointers to the small table are retained in the new work via the LocalWork data structure.

 

Image Modifiedto: Image Modified

 

  • MapJoinResolver:  Here, the two mapjoin paths (hint and non-hint mapjoins) unite again.  One last step for both results is to make it ready for physical execution on MR cluster, described in detail in below section “Spark MapJoin”.  The MapJoinResolver separates the single work into two works.  First a local MapRedWork dealing with small tables, ending with HashTableSink writing the hashmap files.  Then a MapRedWork dealing with big table, loading from small-table hashmap files via HashTableDummyOp.

...

  • Hive on Spark supports automatic bucket mapjoin, which is not supported in MapReduce.  This is done in extra logic via SparkMapJoinOptimizer and SparkMapJoinResolver.

  • Hive on Spark’s SMB to MapJoin conversion path is simplified, by directly converting to MapJoin if eligible.

 

 

Figure 2: Join Processors for Hive on Spark

...

  • SparkSortMergeMapJoinFactory:  This takes a MapWork with operator-tree already with a SMBMapJoin operator and big/small table(s) identified, and creates a LocalWork pointing at small tables.

Image Modifiedto  Image Modified

  • SparkMapJoinProcessor: Like MapJoinProcessor, this logical optimization processor handles initial conversion of common-join to mapjoin, for further-processing, when user has given hints to identify the small tables.  Final operator-tree is just a bit different than with MapReduce, with ReduceSinks of small table branch(es) still attached.

...

  • SparkMapJoinOptimizer: Similar to MapReduce’s MapJoinProcessor and Tez’s ConvertJoinMapJoin, this will transform a common join operator-tree to a mapjoin operator-tree, by identifying the big and small tables via stats.  Like Tez’s processor, it removes the reduce-sinks from the big-table’s branch but keeps them for the small-tables.

Image ModifiedtoImage Modified


  • GenSparkWork/SparkReduceSinkMapJoinProc: During the work-generation phase, a combination of these processors will draw the appropriate work boundaries around the mapjoin operator tree.  It also transforms ReduceSinks into HashTableSinks.

Image ModifiedtoImage Modified


  • SparkMapJoinResolver: Again, the various mapjoin paths (hint and automatic) converge at this final processor before execution on Spark cluster.  This takes a single SparkWork with MapJoin operator and big/small table(s) identified, and splits it into dependent SparkWorks, with the small-table SparkWork(s) being parents of the big-table SparkWork.  This will be sent to Spark cluster to run mapjoin as described in the section “Spark MapJoin”.  The LocalWork data structure is created within the dependent (big-table) SparkWork to contain HashTableDummy’s which load small-table hashmap files.

 

Image Modifiedto: Image Modified

 

And the summary of each join plan’s processor path of Figure 2.

...