Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Index integrations with query engines

Spark

Presto

...

How to apply query predicates in Hudi?

Query predicates are normally constructed in a tree like structure so this will follow same pattern. The proposal is create a mapping utility from “Engine” query predicates to a HudiExpression. This way filtering logic is engine agnostic

For AND and OR operators we can translate to a tree node with left and right expressions. An example is shown below of what the structure would look


Code Block
languagejava
public class HudiExpressionParentNode implements HudiExpression {
   HudiExpression left;
   HudiExpression right;
   
   @override
   boolean evaluate() {
        left.evaluate() && right.evaluate()
   }
}


For LEAF nodes we can create expression which contains the operator and value we are comparing to determine whether the file group may have data relevant to this query. The common search expressions for the leaf nodes:

  1. Equal to - if value in search expression greater than or equal to lower bound and is less than or equal to upper bound in file’s column statistics then true, else false
  2. Less than - if value in search expression is greater than lower bound in file’s column statistics then true, else false
  3. Less than or equal to - if value in search expression is greater than or equal to lower bound in file’s column statistics then true, else false
  4. Greater than - if value in search expression is lower than upper bound in file’s column statistics then true, else false
  5. Greater than or equal to - if value in search expression is lower than or equal to upper bound in file’s column statistics then true, else false

True tells us that there is a possibility that the file contains data which matches the search expression and to include in result set. False tells us that there is no possibility this file contains any data which matches the search expression and to exclude from the results.

Code Block
languagejava
public class HudiExpressionLeafNode implements HudiExpression {
   
   Operator op; // (EQ, LT, LTEQ, GT, GTEQ)
   T literal; // (INT, DOUBLE, FLOAT value)
   String column;
   
   @override
   boolean evaluate()
}


This way we can call evaluate on the root HudiExpression tree and it will determine whether the entire expression is satisfied for the file group.

Hive

In order for us to implement predicate push down in Hive we need to have access to the query predicate. Query predicate is not passed to Hive InputFormat by default. HiveStoragePredicateHandler interface needs to be implemented in order to provide query predicate to InputFormat and for this we need to create a custom HiveStorageHandler. Therefore we will be creating new storage handler HudiStorageHandler

Code Block
languagejava
public interface HiveStorageHandler extends Configurable {
  public Class<? extends InputFormat> getInputFormatClass();
  public Class<? extends OutputFormat> getOutputFormatClass();
  public Class<? extends SerDe> getSerDeClass();
  public HiveMetaHook getMetaHook();
  public void configureTableJobProperties(
    TableDesc tableDesc,
    Map<String, String> jobProperties);
}

Everything will remain same with input format, output format, and serde classes being used in existing Hudi tables registered in Hive (HoodieParquetInputFormat still being used).  HudiStorageHandler would implement HiveStorageHandler and HiveStoragePredicateHandler.


Hive adds the query predicate returned by the Storage Handler to the job configuration. This job configuration is then supplied to the Input Format. It can be fetched and deserialized using the following:

    String hiveFilter = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (hiveFilter != null) {
ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities
.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
SearchArgument sarg = ConvertAstToSearchArg.create(job, exprNodeDesc);

The SearchArgument contains an ExpressionTree and a list of PredicateLeaf. The ExpressionTree is a tree structure used to define the query predicate. If operator is defined as OR, AND, or NOT this indicates there are children expressions, normally LEAFs.

public class ExpressionTree {
public enum Operator {OR, AND, NOT, LEAF, CONSTANT}
private final Operator operator;
private final List<ExpressionTree> children;
private int leaf;

If operator in ExpressionTree is defined as LEAF it indicates the leaf field will be populated with an index value. This index corresponds to the index in a list of PredicateLeafs defined in the Search Argument. PredicateLeaf will contain information about the query predicate such as operator, column name, and literal which is being compared

        private final org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.Operator operator;
private final Type type;
private String columnName;
private final Object literal;
private final List<Object> literalList;

Hive supported operators in PredicateLeaf

  public static enum Operator {
EQUALS,
NULL_SAFE_EQUALS,
LESS_THAN,
LESS_THAN_EQUALS,
IN,
BETWEEN,
IS_NULL
}

Example query

select event_name from storage_handler_table where event_id = '1'

will produce following leaf:

Filter passed to Input Format: leaf-0 = (EQUALS event_id 1), expr = leaf-0


We can use this information and the SearchArgument to generate our HudiExpression. Then in HoodieParquetInputFormat.listStatus() after fetching files from FileSystemView we can apply data filter using column metadata for the remaining file groups.

Spark


Presto

Rollout/Adoption Plan

  • <What impact (if any) will there be on existing users?>
  • <If we are changing behavior how will we phase out the older behavior?>
  • <If we need special migration tools, describe them here.>
  • <When will we remove the existing behavior?>

...