You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Join Optimization

For a general discussion of Hive joins including syntax, examples, and restrictions, see the Joins wiki doc.

Improvements to the Hive Optimizer

Version

The join optimizations described here were added in Hive version 0.11.0. See HIVE-3784 and related JIRAs.

This document describes optimizations of Hive's query execution planning to improve the efficiency of joins and reduce the need for user hints.

Hive automatically recognizes various use cases and optimizes for them. Hive 0.11 improves the optimizer for these cases:

  • Joins where one side fits in memory. In the new optimization:
    • that side is loaded into memory as a hash table
    • only the larger table needs to be scanned
    • fact tables have a smaller footprint in memory
  • Star-schema joins
  • Hints are no longer needed for many cases.
  • Map joins are automatically picked up by the optimizer.

Star Join Optimization

A simple schema for decision support systems or data warehouses is the star schema, where events are collected in large fact tables, while smaller supporting tables (dimensions) are used to describe the data.

The TPC DS is an example of such a schema. It models a typical retail warehouse where the events are sales and typical dimensions are date of sale, time of sale, or demographic of the purchasing party. Typical queries aggregate and filter fact tables along properties in the dimension tables.

Star Schema Example

Select count(*) cnt
From store_sales ss
     join household_demographics hd on (ss.ss_hdemo_sk = hd.hd_demo_sk)
     join time_dim t on (ss.ss_sold_time_sk = t.t_time_sk)
     join store s on (s.s_store_sk = ss.ss_store_sk)
Where
     t.t_hour = 8
     t.t_minute >= 30
     hd.hd_dep_count = 2
order by cnt;

Prior Support for MAPJOIN

Hive supports MAPJOINs, which are well suited for this scenario – at least for dimensions small enough to fit in memory. Before release 0.11, a MAPJOIN could be invoked either through an optimizer hint:

select /*+ MAPJOIN(time_dim) */ count(*) from
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)

or via auto join conversion:

set hive.auto.convert.join=true;
select count(*) from
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)

MAPJOINs are processed by loading the smaller table into an in-memory hash map and matching keys with the larger table as they are streamed through. The prior implementation has this division of labor:

  • Local work:
    • read records via standard table scan (including filters and projections) from source on local machine
    • build hashtable in memory
    • write hashtable to local disk
    • upload hashtable to dfs
    • add hashtable to distributed cache
  • Map task
    • read hashtable from local disk (distributed cache) into memory
    • match records' keys against hashtable
    • combine matches and write to output
  • No reduce task

Limitations of Prior Implementation

The MAPJOIN implementation prior to Hive 0.11 has these limitations:

  • The mapjoin operator can only handle one key at a time; that is, it can perform a multi-table join, but only if all the tables are joined on the same key. (Typical star schema joins do not fall into this category.)
  • Hints are cumbersome for users to apply correctly and auto conversion doesn't have enough logic to consistently predict if a MAPJOIN will fit into memory or not.
  • A chain of MAPJOINs is not coalesced into a single map-only job, unless the query is written as a cascading sequence of mapjoin(table, subquery(mapjoin(table, subquery....). Auto conversion never produces a single map-only job.
  • The hashtable for the mapjoin operator has to be generated for each run of the query, which involves downloading all the data to the Hive client machine as well as uploading the generated hashtable files.
  • No labels