Proposers
- TBD
Approvers
- TBD
Status
Current state:
Current State | |
---|---|
UNDER DISCUSSION | |
IN PROGRESS | |
ABANDONED | |
COMPLETED | |
INACTIVE |
Discussion thread: here
JIRA: here
Released: <Hudi Version>
Abstract
Query engines typically scan large amounts of irrelevant data for query planning and execution. Some workarounds are available to reduce amount of irrelevant data scanned. These include
- Partition pruning
- File pruning
- Some data file formats contain metadata including range information for certain columns (for parquet, this metadata is stored in footer).
- As part of query planning, all range information from data files is read.
- Irrelevant data files are then pruned based on predicates and available range information
Partition pruning typically puts the burden on users to select partitions where the data may exist. File pruning approach is expensive and does not scale if there are large number of partitions and data files to be scanned. So we propose a new solution to store additional information as part of Hudi metadata table to implement data skipping index. The goals of data skipping index is to provide:
- Global index: Users query for information they need without need for specifying partitions. Index can effectively find data files in the table.
- Improve query plan: Efficiently find data files that have information for specified query predicates.
- Support multiple types of index: Initial implementation may provide range index. But goal is provide flexible framework to implement other types of index (e.g. bloom)
Background
<Introduce any much background context which is relevant or necessary to understand the feature and design choices.>
TBD
- Notes: index is only effective it data is organized (using clustering for example). If every file contains data for commonly specified query predicate, index may not be effective
Implementation
At a high level there are 3 components to implement index support.
Index metadata generation
We want to support multiple types of index (range, bloom etc). So it is important to generate different types of record for different columns.
class IndexCreator<O> { // Note that 'O' above refers to collection type. For example, for spark, this could be JavaRDD /** Generates metadata from data files written */ def collectMetadata( O<WriteStatus> filesWritten ) : O<HoodieMetadataRecord> /* returns metadata record */ } // for int column class IntRangeMetadataRecord extends HoodieMetadataRecord { columnName1, partition, fileId, commitTime, min: Int, max: Int } // for string column class StringRangeMetadataRecord extends HoodieMetadataRecord { columnName2, partition, fileId, commitTime, min: String, max: String } // other type of index for column3 class BloomFilterRecord extends HoodieMetadataRecord { columnName3, partition, fileId, commitTime, bloomFilter } class CombinedMetadataRecord(List<HoodieMetadataRecord> allColumnsMetadata) extends HoodieMetadataRecord
Example CombinedMetadataRecords generated:
partition | filePath | c1 | c1_min | c1_max | c2 | c2_min | c2_max |
p1 | f1-c1.parquet | city_id | 20 | 30 | commit_time | “a” | “g” |
p1 | f2-c1.parquet | city_id | 25 | 100 | commit_time | “b” | “g” |
p2 | f3-c1.parquet | city_id | 40 | 60 | commit_time | “i” | “w” |
p3 | f4-c1.parquet | city_id | 300 | 400 | commit_time | “x” | “z” |
Few notes (TBD orgnanize this better):
- We store full file path (not fileId). So if we create new data file (say f1-c10.parquet), then we add new metadata record
- Schema can get complex if there are lot of columns.
- Schema is going to be different for different tables based on columns in the table
Index storage layout
We considered HFile and Parquet for storing metadata records generated above. See comparison below
HFile
HFile contains a multi-layered index. Keys are required to be inserted in increasing order. Data is stored in blocks (typically 64KB blocks).
Layer of index:
- Each block has its own leaf index (min/max key of block etc)
- Last key of each block is put in intermediate index
- Root index in trailer points to intermediate index
Because keys are expected to be inserted in increasing order, typically enough data is accumulated in memory and once a certain size is reached, data is sorted and inserted to HFile in order.
HFile Advantages:
- Fits well with existing Hudi metadata table
- Fast constant time key lookup (2ms)
- Keeps data sorted by key
HFile Disadvantages:
- Doesn’t have predicate pushdown/filtering logic
- Expensive to lookup lot of keys from same file sequentially (5000keys * 2ms = 10sec)
- Range scan also seems slower (~15seconds)
- Lot of storage used because there is no columnar compression
Parquet
Parquet stores data in columnar format
Parquet Advantages:
- Efficient predicate pushdown. Can store all columns metadata together. Only query desired columns efficiently
- Parallel processing of splits natively supported
- Can provide UDF support if needed. This may be useful for geo queries commonly used (users store latitude/longitude in table. But we can query data in hexagon/quad-tree efficiently using data skipping index)
- Better storage compression
- We can try different layouts by sorting data on different parameters (partition/fileId/columnBeingIndexed etc)
Parquet Disadvantages:
- Doesn’t work well with hudi metadata table (because metadata table base format is HFile)
- No fast single key lookup. So, may not be ideal for other types of index like UUID lookup?
Hfile vs Parquet Comparison for Range index
Input:
One partition of popular production table.
- Total #files: 1288 (metadata for only latest snapshot stored)
- File size: 150-250MB.
- Total data size: ~250GB
- # of Columns: 4156
- # of columns with metadata: 2078
Results:
Time to scan full file | Time to query 10 rows | Time for query large range (5K rows) | Storage space | |
HFile | 15 seconds | 51 ms | 17 seconds | 100MB |
Parquet | 6.1 seconds | 1.9 seconds | 2.1 seconds | 43MB |
Parquet-spark sql | 7 seconds | 440 ms | 1.5 seconds | 43MB |
Index integrations with query engines
Spark
Presto
Hive
Rollout/Adoption Plan
- <What impact (if any) will there be on existing users?>
- <If we are changing behavior how will we phase out the older behavior?>
- <If we need special migration tools, describe them here.>
- <When will we remove the existing behavior?>
Test Plan
<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>