Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

...

Filter Pushdown

...

Table of Contents

Introduction

This document explains how we are planning to add support in Hive's
optimizer for pushing filters down into physical access methods. This
is an important optimization for minimizing the amount of data scanned
and processed by an access method (e.g. for an indexed key lookup), as
well as reducing the amount of data passed into Hive for further query
evaluation.

Use Cases

Below are the main use cases we are targeting.

Components Involved

...

To achieve the loosest possible coupling, we are going to use a string
as the primary representation for the filter. In particular, the string
will be in the form produced when Hive unparses an ExprNodeDesc, e.g.

Code Block

((key >= 100) and (key < 200))

In general, this comes out as valid SQL, although it may not always match
the original SQL exactly, e.g.

Code Block

cast(x as int)

becomes

Code Block

UDFToInteger(x)

Column names in this string are unqualified references to the columns
of the table over which the filter operates, as they are known in the
Hive metastore. These column names may be different from those known
to the underlying storage; for example, the HBase storage handler maps
Hive column names to HBase column names (qualified by column family).
Mapping from Hive column names is the responsibility of the
code interpreting the filter string.

...

As mentioned above, we want to avoid duplication in code which
interprets the filter string (e.g. parsing). As a first cut, we will
provide access to the ExprNodeDesc tree by passing it along in
serialized form as an optional companion to the filter string. In followups, we will provide parsing utilities for the string form.

We will also provide an IndexPredicateAnalyzer class capable of detecting simple sargable
subexpressions in an ExprNodeDesc tree. In followups, we will provide support for discriminating and combining more complex indexable subexpressions.

Code Block

public class IndexPredicateAnalyzer
{
  public IndexPredicateAnalyzer();

  /**
 * Registers a comparison operator as one which can be satisfied
 * by an index search.  Unless this is called, analyzePredicate
 * will never find any indexable conditions.
   *
 * @param udfName name of comparison operator as returned
 * by either {@link GenericUDFBridge#getUdfName} (for simple UDF's)
 * or udf.getClass().getName() (for generic UDF's).
   */
  public void addComparisonOp(String udfName);

  /**
 * Clears the set of column names allowed in comparisons.  (Initially, all
 * column names are allowed.)
   */
  public void clearAllowedColumnNames();

  /**
 * Adds a column name to the set of column names allowed.
   *
 * @param columnName name of column to be allowed
   */
  public void allowColumnName(String columnName);

  /**
 * Analyzes a predicate.
   *
 * @param predicate predicate to be analyzed
   *
 * @param searchConditions receives conditions produced by analysis
   *
 * @return residual predicate which could not be translated to
 * searchConditions
   */
  public ExprNodeDesc analyzePredicate(
    ExprNodeDesc predicate,
    final List<IndexSearchCondition> searchConditions);

  /**
 * Translates search conditions back to ExprNodeDesc form (as
 * a left-deep conjunction).
   *
 * @param searchConditions (typically produced by analyzePredicate)
   *
 * @return ExprNodeDesc form of search conditions
   */
  public ExprNodeDesc translateSearchConditions(
    List<IndexSearchCondition> searchConditions);
}

public class IndexSearchCondition
{
  /**
 * Constructs a search condition, which takes the form
 * <pre>column-ref comparison-op constant-value</pre>.
   *
 * @param columnDesc column being compared
   *
 * @param comparisonOp comparison operator, e.g. "="
 * (taken from GenericUDFBridge.getUdfName())
   *
 * @param constantDesc constant value to search for
   *
 * @Param comparisonExpr the original comparison expression
   */
  public IndexSearchCondition(
    ExprNodeColumnDesc columnDesc,
    String comparisonOp,
    ExprNodeConstantDesc constantDesc,
    ExprNodeDesc comparisonExpr);
}


...

The approach for passing the filter down to the input format will
follow a pattern similar to what is already in place for pushing
column projections down.

  • org.apache.hadoop.hive.serde2.ColumnProjectionUtils encapsulates the pushdown communication
  • classes such as HiveInputFormat call ColumnProjectionUtils to set the projection pushdown property (READ_COLUMN_IDS_CONF_STR) on a jobConf before instantiating a RecordReader
  • the factory method for the RecordReader calls ColumnProjectionUtils to access this property

...

So, where will HiveInputFormat get the filter expression to be
passed down? Again, we can start with the pattern for column projections:

...

For filter pushdown, the equivalent is TableScanPPD in
org.apache.hadoop.hive.ql.ppd.OpProcFactory. Currently, it calls
createFilter, which collapsed expressions into a single expression
called condn, and then sticks that on a new FilterOperator. We can
call condn.getExprString() and store the result on TableScanOperator.

...

Consider a filter like

Code Block

x > 3 AND upper(y) = 'XYZ'

...

In order to support this interaction, we will introduce a new (optional) interface to be implemented by storage handlers:

Code Block

public interface HiveStoragePredicateHandler {
  public DecomposedPredicate decomposePredicate(
    JobConf jobConf,
    Deserializer deserializer,
    ExprNodeDesc predicate);

  public static class DecomposedPredicate {
    public ExprNodeDesc pushedPredicate;
    public ExprNodeDesc residualPredicate;
  }
}

...