Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Fixing the spaces and format issue that come from uploading from another format.

...

Purpose and Prerequisites

 

The purpose of this document is to summarize the findings of all the research of different joins and describe a unified design to attack the problem in Spark.  It will identify the optimization processors will be involved and their responsibilities.

It is not the purpose to go in depth for design of the various join implementations in Spark, such as the common-join (HIVE-7384), or the optimized join variants like mapjoin (HIVE-7613), skew-join (HIVE-8406) or SMB mapjoin (HIVE-8202).  It will be helpful to refer to the design documents attached on JIRA for those details before reading this document, as they will also contain some background of how they are implemented in MapReduce and comparisons.  Lastly, it will also be helpful to read the overall Hive on Spark design doc before reading this document.

MapReduce Summary

...

This section summarizes plan-generation of different joins of Hive on MapReduce, which will serve as a model for Spark.  We aim to support most of these join optimizations.  Priority will be for the automatically-optimized joins, followed by those that need user input, such as hints and metadata. 

Over the years, there have been lots of join optimization introduced to Hive beyond the common-join, via Processors (partial transformation of operator-tree or work tree).  The following diagram (Figure 1) shows the relationships of different Processors, each of which does a small part in transforming an operator-tree from common-join to one of the optimized join work-trees (mapjoin, bucket mapjoin, SMB mapjoin, or skewjoin). 

Processors are represented by boxes in the following diagram:  They are split into three types:

  • Logical optimization processor (Green):  Deals with pure operator-tree (no physical works).

  • Work-generation processor (Blue):  Deals with operator-tree and takes part in forming a physical work-tree.

  • Physical optimization processor (Red):  Deals with fully-formed physical work-tree.

...

Each processor box shows the triggering condition, either a Hive configuration property, or the presence of a certain operator in the tree.  So, you can see how to trigger a particular join by following its path through processors and making sure all configurations are triggered and the given operator has been created by previous processors.  There are further conditions to do the transform listed on the top, (ie, size of table, etc), that are not be explained by this document, and can be referred from documents of individual joins.

Image Removed

 

Figure 1. Join Processors for Hive on MapReduce

 Image Added

The input arrow at top before any Processor is always an operator-plan for common-join, which is shown as follows.  In other words, this is the original join plan if none of the optimizer processors are activated. 

 

The ‘exit’ arrows of the join paths shown in Figure 1 are the various optimized join variants.  There are of course other exit paths, in which the plan remains unchanged as a common-join upon falling out of any processor’s pre-req checks, but they are not shown to simplify the diagram.

...

MapJoinFactory SMB MapJoin processing:

to  

  • CommonJoinResolver:  This handles auto-conversions of joins to mapjoins and goes a separate path than hint-based mapjoin conversions.  This takes a working common-join work tree already generated from the common-join operator-tree, and creates an alternative work-tree.  The new work-tree consists of a mapwork rooted at the big table.  Pointers to the small table are retained in the new work via the LocalWork data structure.

 

to:  

  • MapJoinResolver:  Here, the two mapjoin paths (hint and non-hint mapjoins) unite again.  One last step for both results is to make it ready for physical execution on MR cluster, described in detail in below section “Spark MapJoin”.  The MapJoinResolver separates the single work into two works.  First a local MapRedWork dealing with small tables, ending with HashTableSink writing the hashmap files.  Then a MapRedWork dealing with big table, loading from small-table hashmap files via HashTableDummyOp.

to  

A brief summary of all the processor-paths of possible join plans shown in Figure 1: 

  1.  Skewjoin (compile-time)

    1. SkewJoinOptimizer: From a common-join operator tree, creates two join operator-trees connected by union operator.  These will represent a join with skew key, and a join without it.

    2. One or both reduce-side join might be converted to mapjoin by CommonJoinResolver, see auto-mapjoin for more details.

  2. Skewjoin (runtime)

    1. SkewJoinResolver:  Create conditional work after the original common-join work, which is a list of mapjoin works.  These will handle the skew keys.

    2. MapJoinResolver: Final preparation for mapjoin works as described.

  3. Auto-mapjoin

    1. CommonJoinResolver:  Convert common-join operator tree to mapjoin operator-tree, with big/small table(s) identified on the Mapjoin operator, as described.

    2. MapJoinResolver: Final preparation for mapjoin works as described.

  4. Map join query with hints

    1. MapJoinProcessor:  Convert common-join operator tree to mapjoin operator-tree, with big/small table(s) identified on the Mapjoin operator, as described.

    2. MapJoinFactory: Adds localWork pointing to small tables in mapjoin work, as described.

    3. MapJoinResolver:  Final preparation for mapjoin works as described.

  5. Bucket map join query with hints.

    1. MapJoinProcessor:  Convert common-join operator tree to mapjoin operator-tree, with big/small table(s) identified on the Mapjoin operator, as described.

    2. BucketMapJoinProcessor:  Add bucketing information to MapJoin op.

    3. MapJoinFactory: Adds localWork pointing to small tables in mapjoin work, as described.

    4. MapJoinResolver:  Final preparation for mapjoin works as described.

  6. SMB join query with hints

    1. MapJoinProcessor:  Convert common-join operator tree to mapjoin operator-tree, with big/small table(s) identified on the Mapjoin operator, as described.

    2. SortedBucketMapJoinProc:  Convert mapjoin operator-tree to SMBMapJoin operator-tree.  Add DummyOp to small-tables.

    3. MapJoinFactory:  Adds localWork pointing to small tables in SMBMapjoin work, as described.

    4. May be converted back to MapJoin (see #8 for details).

  7. Auto-SMB join

    1. SortedMergeBucketMapJoinProc: Convert mapjoin operator-tree to SMBMapJoin operator-tree.  Add DummyOp to small-tables.

    2. MapJoinFactory:  Adds localWork pointing to small tables in SMBMapjoin work, as described.

    3. May be converted to MapJoin (see #8 for details).

  8.  SMB join that converts to mapjoin

    1. SMBJoin operator-tree constructed as mentioned in #6, #7 above.

    2. SortedMergeJoinResolver:  For each possible big-table candidate, create a mapjoin work.  These will have LocalWork data structures to keep track of small-tables.  Create ConditionalWork with all of these mapjoin works (with the original SMBJoin work as the backup task of each one), and the original SMBJoin work as the last option.

    3. MapJoinResolver: For each mapjoin work created, final preparation as described.

Tez Comparison

...

Hive on Tez is still evolving.  They currently disable all logical-optimizer processors, and use a processor called “ConvertJoinMapJoin” located in the work-generation phase.  It utilitzes stats annotated on the operator-tree to make some decisions as to what join to take.  It will directly create plans for the following joins:

...

For most of the joins for Hive on Spark, the overall execution will be similar to MR for the first cut.  Thus, a similar work-tree as in MR will be generated, though encapsulated in SparkWork(s) instead of MapRedWork(s). 

One difference is implementation of mapjoin, which is worth spending some time discussing.  Recall the mapjoin work-tree in MapReduce: 

  1. Run the MapredLocalWork containing small-table(s)’ operator-tree, ending it with a HashTableSink op that dumps to file.  This is made into a distributed cache.

  2. Run the MapWork for the big table, which will populate small-table hashmap from the distributed cache file using HashTableDummy’s loader.

...

Spark mapjoin has a choice to take advantage of faster Spark functionality like broadcast-variable, or use something similar to distributed-cache.  A discussion for choosing MR-style distributed cache is given in “small-table broadcasting” document in HIVE-7613, though broadcast-variable support might be added in future.  Here is the plan that we want.

...

  1. Run the small-table SparkWorks on Spark cluster, which dump to hashmap file (this is main difference with MR, as the small-table work is distributed).

  2. Run the SparkWork for the big table on Spark cluster.  Mappers will lookup the small-table hashmap from the file using HashTableDummy’s loader.

...

For bucket map-join, each bucket of each small table goes to a separate file, and each mapper of big-table loads the specific bucket-file(s) of corresponding buckets for each small table.

Spark Join Design

...

Let’s redraw the processor diagram for Hive on Spark.  There are several other points to note in this section:

...

  • Hive on Spark supports automatic bucket mapjoin, which is not supported in MapReduce.  This is done in extra logic via SparkMapJoinOptimizer and SparkMapJoinResolver.

  • Hive on Spark’s SMB to MapJoin conversion path is simplified, by directly converting to MapJoin if eligible.

Image Removed

 

...

Figure 2: Join Processors for Hive on Spark

Image Added


Again, we first explore some of the interesting processors:

...