Table of Contents |
---|
Proposers
Approvers
- TBD (This is a WIP RFC, Approvers to be added after it is ready to review)
Status
Current state:
Current State | |||||||||
---|---|---|---|---|---|---|---|---|---|
| |||||||||
| |||||||||
| |||||||||
| |||||||||
|
...
JIRA: here
Released: <Hudi Version>
Abstract
Query engines typically scan large amounts of irrelevant data for query planning and execution. Some workarounds are available to reduce amount of irrelevant data scanned. These include
- Partition pruning
- File pruning
- Some data file formats contain metadata including range information for certain columns (for parquet, this metadata is stored in footer).
- As part of query planning, all range information from data files is read.
- Irrelevant data files are then pruned based on predicates and available range information
Partition pruning typically puts the burden on users to select partitions where the data may exist. File pruning approach pruning approach is expensive and does not scale if there are large number of partitions and data files to be scanned. So we propose a new solution to store additional information as part of Hudi metadata table to implement data skipping index. The goals of data skipping index is to provide:
- Global index: Users query for information they need without need for specifying partitions. Index can effectively find data files in the table.
- Improve query plan: Efficiently find data files that have information for specified query predicates.
- Support multiple types of index: Initial implementation may provide range index. But goal is provide flexible framework to implement other types of index (e.g. bloom)
Background
<Introduce any much background context which is relevant or necessary to understand the feature and design choices.>
...
- Notes: index is only effective it data is organized (using clustering for example). If every file contains data for commonly specified query predicate, index may not be effective
Implementation
At a high level there are 3 components to implement index support. : Metadata generation, storage format, query engine integration. Next 3 subsections discuss this in detail.
Index metadata generation
We want to support multiple types of index (range, bloom etc). So it is important to generate different types of record for different columns.
Code Block |
---|
class IndexCreator<O> { // Note that 'O' above refers to collection type. For example, for spark, this could be JavaRDD /** Generates metadata from data files written */ def collectMetadata( O<WriteStatus> filesWritten ) : O<HoodieMetadataRecord> /* returns metadata record */ } // for int column class IntRangeMetadataRecord extends HoodieMetadataRecord { columnName1, partition, fileId, commitTime, min: Int, max: Int } // for string column class StringRangeMetadataRecord extends HoodieMetadataRecord { columnName2, partition, fileId, commitTime, min: String, max: String } // other type of index for column3 class BloomFilterRecord extends HoodieMetadataRecord { columnName3, partition, fileId, commitTime, bloomFilter } class CombinedMetadataRecord(List<HoodieMetadataRecord> allColumnsMetadata) extends HoodieMetadataRecord |
Example CombinedMetadataRecords generated:
partition | filePath | c1 | c1_min | c1_max | c2 | c2_min | c2_max |
p1 | f1-c1.parquet | city_id | 20 | 30 | commit_time | “a” | “g” |
p1 | f2-c1.parquet | city_id | 25 | 100 | commit_time | “b” | “g” |
p2 | f3-c1.parquet | city_id | 40 | 60 | commit_time | “i” | “w” |
p3 | f4-c1.parquet | city_id | 300 | 400 | commit_time | “x” | “z” |
...
- We store full file path (not fileId). So if we create new data file (say f1-c10c2.parquet), then we add new metadata recordrow in metadata table for this file
- Schema can get complex if there are lot of columns.
- Schema is going to be different for different tables based on columns in the table
Index storage layout
We considered HFile and Parquet for storing metadata records generated above. See comparison below
HFile
HFile contains a multi-layered index. At a high level this looks like persisted B+ tree. Keys are required to be inserted in increasing order. Data is stored in blocks (typically 64KB blocks).
...
- Doesn’t have predicate pushdown/filtering logic
- Expensive to lookup lot of keys from same file sequentially (5000keys * 2ms = 10sec)
- Range scan also seems slower (~15seconds)
- Lot of storage used because there is no columnar compression
Parquet
Parquet stores data in columnar format
...
- Efficient predicate pushdown. Can store all columns metadata together. Only query desired columns efficiently
- Parallel processing of splits natively supported
- Can provide UDF support if needed.
- This may be useful for geo queries commonly used
- Example: Table has latitude/longitude
- columns. But we can query data in hexagon/quad-tree efficiently using data skipping index
- . using ranges for latitude and longitude may not be as effective.
- Better storage compression
- We can try different layouts by sorting data on different parameters (partition/fileId/columnBeingIndexed etc)
...
- Doesn’t work well with hudi metadata table (because metadata table base format is HFile. HUDI table cannot support different file formats for different partitions)
- No fast single key lookup. So, may not be ideal for other types of index like UUID lookup?
Hfile vs Parquet Comparison for Range index
Input:
One partition of popular production table.
...
Time to scan full file | Time to query 10 rows | Time for query large range (5K rows) | Storage space | |
HFile | 15 seconds | 51 ms | 17 seconds | 100MB |
Parquet | 6.1 seconds | 1.9 seconds | 2.1 seconds | 43MB |
Parquet-spark sql | 7 seconds | 440 ms | 1.5 seconds | 43MB |
Index integrations with query engines
How to apply query predicates in Hudi?
Query predicates are normally constructed in a tree like structure so this will follow same pattern. The proposal is create a mapping utility from “Engine” query predicates to a HudiExpression. This way filtering logic is engine agnostic
For AND and OR operators we can translate to a tree node with left and right expressions. An example is shown below of what the structure would look
Code Block | ||
---|---|---|
| ||
public class HudiExpressionParentNode implements HudiExpression {
HudiExpression left;
HudiExpression right;
@override
boolean evaluate() {
left.evaluate() && right.evaluate()
}
} |
For LEAF nodes we can create expression which contains the operator and value we are comparing to determine whether the file group may have data relevant to this query. The common search expressions for the leaf nodes:
- Equal to - if value in search expression greater than or equal to lower bound and is less than or equal to upper bound in file’s column statistics then true, else false
- Less than - if value in search expression is greater than lower bound in file’s column statistics then true, else false
- Less than or equal to - if value in search expression is greater than or equal to lower bound in file’s column statistics then true, else false
- Greater than - if value in search expression is lower than upper bound in file’s column statistics then true, else false
- Greater than or equal to - if value in search expression is lower than or equal to upper bound in file’s column statistics then true, else false
True tells us that there is a possibility that the file contains data which matches the search expression and to include in result set. False tells us that there is no possibility this file contains any data which matches the search expression and to exclude from the results.
Code Block | ||
---|---|---|
| ||
public class HudiExpressionLeafNode implements HudiExpression {
Operator op; // (EQ, LT, LTEQ, GT, GTEQ)
T literal; // (INT, DOUBLE, FLOAT value)
String column;
@override
boolean evaluate()
} |
This way we can call evaluate on the root HudiExpression tree and it will determine whether the entire expression is satisfied for the file group.
Hive
In order for us to implement predicate push down in Hive we need to have access to the query predicate. Query predicate is not passed to InputFormat by default. HiveStoragePredicateHandler interface needs to be implemented in order to provide query predicate to InputFormat and for this we need to create a custom HiveStorageHandler. Therefore we will be creating new storage handler HudiStorageHandler.
Code Block | ||
---|---|---|
| ||
public interface HiveStorageHandler extends Configurable {
public Class<? extends InputFormat> getInputFormatClass();
public Class<? extends OutputFormat> getOutputFormatClass();
public Class<? extends SerDe> getSerDeClass();
public HiveMetaHook getMetaHook();
public void configureTableJobProperties(
TableDesc tableDesc,
Map<String, String> jobProperties);
} |
Everything will remain same with input format, output format, and serde classes being used in existing Hudi tables registered in Hive (HoodieParquetInputFormat still being used). HudiStorageHandler would implement HiveStorageHandler and HiveStoragePredicateHandler.
Hive adds the query predicate returned by the Storage Handler to the job configuration. This job configuration is then supplied to the Input Format. It can be fetched and deserialized using the following:
String hiveFilter = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (hiveFilter != null) {
ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities
.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
SearchArgument sarg = ConvertAstToSearchArg.create(job, exprNodeDesc);
The SearchArgument contains an ExpressionTree and a list of PredicateLeaf. The ExpressionTree is a tree structure used to define the query predicate. If operator is defined as OR, AND, or NOT this indicates there are children expressions, normally LEAFs.
public class ExpressionTree {
public enum Operator {OR, AND, NOT, LEAF, CONSTANT}
private final Operator operator;
private final List<ExpressionTree> children;
private int leaf;
If operator in ExpressionTree is defined as LEAF it corresponds to a PredicateLeaf defined in the Search Argument. PredicateLeaf will contain information about the query predicate such as operator, column name, and literal which is being compared
private final org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.Operator operator;
private final Type type;
private String columnName;
private final Object literal;
private final List<Object> literalList;
We can use this information and the SearchArgument to generate our HudiExpression. Then in HoodieParquetInputFormat.listStatus() after fetching files from FileSystemView for the remaining file groups we can apply HudieExpression using column metadata.
Spark
Presto
Hive
Rollout/Adoption Plan
- <What impact (if any) will there be on existing users?>
- <If we are changing behavior how will we phase out the older behavior?>
- <If we need special migration tools, describe them here.>
- <When will we remove the existing behavior?>
Appendix
The below code can be run in spark-shell (or jupyter notebook) to quickly iterate on different formats (TODO integrate with hudi metadata table writer code)
Generating Parquet metadata table
Code Block | ||
---|---|---|
| ||
import org.apache.hudi.common.table.HoodieTableMetaClient import java.util.Properties val tableName="test.metadata_test_parquet" val props = new Properties() props.put("hoodie.table.name", tableName) val destPath="hdfs://tables/my_metadata_table_parquet" HoodieTableMetaClient.initTableAndGetMetaClient(spark.sparkContext.hadoopConfiguration, destPath, props) import org.apache.hudi.common.table.HoodieSnapshotMetadataClient import org.apache.hudi.common.table.HoodieTableMetaClient import java.util.stream.Collectors import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData import org.apache.parquet.column.statistics.Statistics import scala.collection.JavaConverters._ import spark.implicits._ case class FileStats(val minVal: String, val maxVal: String) case class ColumnFileStats(val partitionPath: String, val fileName: String, val colName: String, val minVal: String, val maxVal: String) val basePath = "hdfs://tables/my_table1" val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath) val snapshotClient = new HoodieSnapshotMetadataClient(metaClient) val partition="2021/02/03" val files = snapshotClient.getLatestSnapshotFiles(partition).collect(Collectors.toList()).asScala.toSeq val allmetadata = files.flatMap(filePath => { val blocks = ParquetFileReader.readFooter(spark.sparkContext.hadoopConfiguration, filePath).getBlocks().asScala blocks.flatMap(b => b.getColumns().asScala. map(col => (col.getPath().toDotString(), FileStats(Objects.toString(col.getStatistics().minAsString()), Objects.toString(col.getStatistics().maxAsString()))))).toSeq. groupBy(x => x._1).mapValues(v => v.map(vv => vv._2).toSeq). mapValues(value => FileStats(value.map(_.minVal).min, value.map(_.maxVal).max)).toSeq. map(x => ColumnFileStats(partition, filePath.getName(), x._1, x._2.minVal, x._2.maxVal)) }) val sqlContext = new org.apache.spark.sql.SQLContext(spark.sparkContext) val rdd = spark.sparkContext.parallelize(allmetadata) rdd.toDF().write.format("org.apache.hudi"). option("hoodie.datasource.write.recordkey.field", "partitionPath,fileName,colName"). option("hoodie.datasource.write.precombine.field", "colName"). option("hoodie.table.name", "test.metadata_test"). option("hoodie.datasource.write.operation", "insert"). option("hoodie.parquet.small.file.limit", "107374182400"). option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator"). mode("Append"). save(destPath) |
Querying Parquet metadata table
Code Block | ||
---|---|---|
| ||
import System.nanoTime
def profile[R](code: => R, t: Long = nanoTime) = (code, (((nanoTime - t)/1000)).toInt)
val parquetBasePath = new Path("hdfs://tables/my_metadata_table1/default/d95d3fbe-fe66-48fc-8bdc-7db923496916-0_0-55-98_20210327202848.parquet")
val fullSchema = ParquetUtils.readAvroSchema(spark.sparkContext.hadoopConfiguration, parquetBasePath)
val schema = HoodieAvroUtils.generateProjectionSchema(fullSchema,
java.util.Arrays.asList("partitionPath", "fileName", "colName", "minVal", "maxVal"))
val keys = new scala.collection.mutable.ListBuffer[String]()
val (count, time) = profile {
AvroReadSupport.setAvroReadSchema(spark.sparkContext.hadoopConfiguration, schema)
AvroReadSupport.setRequestedProjection(spark.sparkContext.hadoopConfiguration, schema)
val filterToUse = FilterCompat.get(
FilterApi.or(
FilterApi.or(
FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("BASE.city_id")),
FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("ACTIONS.actions.array.source.type"))
),
FilterApi.or(
FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("ACTIONS.actions.array.contactType")),
FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("ACTIONS.actions.array.destination.type"))
)
)
)
val parquetReader = AvroParquetReader.builder(parquetBasePath).
withConf(spark.sparkContext.hadoopConfiguration).
withFilter(filterToUse).
build()
var record1 : GenericRecord = parquetReader.read().asInstanceOf[GenericRecord]
var count: Long = 1
while (record1 != null) {
record1 = parquetReader.read().asInstanceOf[GenericRecord]
count+=1
}
count
}
"" + (time/1000) + " ms to READ column info from parquet " + count
|
Generating HFile metadata table
Code Block | ||
---|---|---|
| ||
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.Properties
val tableName=""test.metadata_test_hfile"
val destPath="hdfs://tables/my_metadata_table_hfile"
val props = new Properties()
props.put("hoodie.table.name", tableName)
props.put("hoodie.table.base.file.format", "HFILE")
HoodieTableMetaClient.initTableAndGetMetaClient(spark.sparkContext.hadoopConfiguration, destPath, props)
import java.text.DecimalFormat
import org.apache.hudi.common.table.HoodieSnapshotMetadataClient
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.stream.Collectors
import java.util.Objects
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData
import org.apache.parquet.column.statistics.Statistics
import scala.collection.JavaConverters._
import spark.implicits._
val basePath = "hdfs://tables/my_table1"
case class FileStats(val minVal: String, val maxVal: String)
case class ColumnFileStats(val partitionPath: String, val fileName: String, val colName: String, val minVal: String, val maxVal: String)
val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath)
val snapshotClient = new HoodieSnapshotMetadataClient(metaClient)
val partition="2021/02/03"
val files = snapshotClient.getLatestSnapshotFiles(partition).collect(Collectors.toList()).asScala.toSeq
val allmetadata = files.flatMap(filePath => {
val blocks = ParquetFileReader.readFooter(spark.sparkContext.hadoopConfiguration, filePath).getBlocks().asScala
blocks.flatMap(b => b.getColumns().asScala.
map(col => (col.getPath().toDotString(),
FileStats(Objects.toString(col.getStatistics().minAsString()),
Objects.toString(col.getStatistics().maxAsString()))))).toSeq.
groupBy(x => x._1).mapValues(v => v.map(vv => vv._2).toSeq).
mapValues(value => FileStats(value.map(_.minVal).min, value.map(_.maxVal).max)).toSeq.
map(x => ColumnFileStats(partition, filePath.getName(), x._1, x._2.minVal, x._2.maxVal))
})
val sqlContext = new org.apache.spark.sql.SQLContext(spark.sparkContext)
val rdd = spark.sparkContext.parallelize(allmetadata)
rdd.toDF().write.format("org.apache.hudi").
option("hoodie.datasource.write.recordkey.field", "partitionPath,fileName,colName").
option("hoodie.datasource.write.precombine.field", "colName").
option("hoodie.table.name", tableName).
option("hoodie.datasource.write.operation", "bulk_insert").
option("hoodie.parquet.small.file.limit", "1073741824").
option("hoodie.parquet.max.file.size", "1073741824").
option("hoodie.bulkinsert.shuffle.parallelism", "1").
option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").
mode("Append").
save(destPath)
}) |
Querying HFile Metadata
Note that HFile format key could be stored differently. This is a simple example to measure time taken to read 'N' keys. N could be different for different key formats. (For example, if we choose key = partitionPath and value = range_for_all_files_all_columns_in_the_partition, then we only have to read few keys compared to key = partition+file_name, value = range_for_all_columns_in_file)
Code Block | ||
---|---|---|
| ||
import System.nanoTime
def profile[R](code: => R, t: Long = nanoTime) = (code, (((nanoTime - t)/1000)).toInt)
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.hadoop.fs.Path
import scala.collection.JavaConverters._
val cacheConfig = new CacheConfig(spark.sparkContext.hadoopConfiguration);
cacheConfig.setCacheDataInL1(false);
val hfilePath = new Path("hdfs://tables/my_metadata_table_hfile/default/53dd4e23-012c-4e3e-91fc-9d5ff6a3bf83-0_0-48-91_20210327201656.hfile")
val reader = new HoodieHFileReader(spark.sparkContext.hadoopConfiguration, hfilePath, cacheConfig);
val keys = reader.readAllRecords().asScala.map(x => x.getFirst())
val keyRange = keys.slice(95000, 100000) // pick desired number of keys
var totalTime: Long = 0
var totalRecords: Long = 0
val reader = new HoodieHFileReader(spark.sparkContext.hadoopConfiguration, hfilePath, cacheConfig)
(keyRange).map { k =>
val (record, time) = profile { reader.getRecordByKey(k) }
totalTime += time
if (record.isPresent()) { totalRecords += 1L}
}
"" + (totalTime/1000) + " ms to LOOKUP HFile, #records: " + totalRecords |
Test Plan
<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>
...