...
Approvers
- TBD (This is a WIP RFC, Approvers to be added after it is ready to review)
...
Current State | |||||||||
---|---|---|---|---|---|---|---|---|---|
| |||||||||
| |||||||||
| |||||||||
| |||||||||
|
...
Query predicates are normally constructed in a tree like structure so this will follow same pattern. The proposal is create a mapping utility from “Engine” query predicates to a HudiExpression. This way filtering logic is engine agnostic
For AND and OR operators we can translate to a tree node with left and right expressions. An example is shown below of what the structure would look
...
This way we can call evaluate on the root HudiExpression tree and it will determine whether the entire expression is satisfied for the file group.
...
In order for us to implement predicate push down in Hive we need to have access to the query predicate. Query predicate is not passed to Hive InputFormat by default. HiveStoragePredicateHandler interface needs to be implemented in order to provide query predicate to InputFormat and for this we need to create a custom HiveStorageHandler. Therefore we will be creating new storage handler HudiStorageHandler.
...
If operator in ExpressionTree is defined as LEAF it indicates the leaf
field will be populated with an index value. This index corresponds to the index in a list of PredicateLeafs it corresponds to a PredicateLeaf defined in the Search Argument. PredicateLeaf will contain information about the query predicate such as operator, column name, and literal which is being compared
private final org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.Operator operator;
private final Type type;
private String columnName;
private final Object literal;
private final List<Object> literalList;
Hive supported operators in PredicateLeaf
public static enum Operator {
EQUALS,
NULL_SAFE_EQUALS,
LESS_THAN,
LESS_THAN_EQUALS,
IN,
BETWEEN,
IS_NULL
}
Example query
select event_name from storage_handler_table where event_id = '1'
will produce following leaf:
Filter passed to Input Format: leaf-0 = (EQUALS event_id 1), expr = leaf-0
We can use this information and the SearchArgument to generate our HudiExpression. Then in HoodieParquetInputFormat.listStatus() after fetching files from FileSystemView we can apply data filter using column metadata for the remaining file groups.
Spark
Presto
Rollout/Adoption Plan
...
We can use this information and the SearchArgument to generate our HudiExpression. Then in HoodieParquetInputFormat.listStatus() after fetching files from FileSystemView for the remaining file groups we can apply HudieExpression using column metadata.
Spark
Presto
Rollout/Adoption Plan
- <What impact (if any) will there be on existing users?>
- <If we are changing behavior how will we phase out the older behavior?>
- <If we need special migration tools, describe them here.>
- <When will we remove the existing behavior?>
Appendix
The below code can be run in spark-shell (or jupyter notebook) to quickly iterate on different formats (TODO integrate with hudi metadata table writer code)
Generating Parquet metadata table
Code Block | ||
---|---|---|
| ||
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.Properties
val tableName="test.metadata_test_parquet"
val props = new Properties()
props.put("hoodie.table.name", tableName)
val destPath="hdfs://tables/my_metadata_table_parquet"
HoodieTableMetaClient.initTableAndGetMetaClient(spark.sparkContext.hadoopConfiguration, destPath, props)
import org.apache.hudi.common.table.HoodieSnapshotMetadataClient
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.stream.Collectors
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData
import org.apache.parquet.column.statistics.Statistics
import scala.collection.JavaConverters._
import spark.implicits._
case class FileStats(val minVal: String, val maxVal: String)
case class ColumnFileStats(val partitionPath: String, val fileName: String, val colName: String, val minVal: String, val maxVal: String)
val basePath = "hdfs://tables/my_table1"
val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath)
val snapshotClient = new HoodieSnapshotMetadataClient(metaClient)
val partition="2021/02/03"
val files = snapshotClient.getLatestSnapshotFiles(partition).collect(Collectors.toList()).asScala.toSeq
val allmetadata = files.flatMap(filePath => {
val blocks = ParquetFileReader.readFooter(spark.sparkContext.hadoopConfiguration, filePath).getBlocks().asScala
blocks.flatMap(b => b.getColumns().asScala.
map(col => (col.getPath().toDotString(),
FileStats(Objects.toString(col.getStatistics().minAsString()),
Objects.toString(col.getStatistics().maxAsString()))))).toSeq.
groupBy(x => x._1).mapValues(v => v.map(vv => vv._2).toSeq).
mapValues(value => FileStats(value.map(_.minVal).min, value.map(_.maxVal).max)).toSeq.
map(x => ColumnFileStats(partition, filePath.getName(), x._1, x._2.minVal, x._2.maxVal))
})
val sqlContext = new org.apache.spark.sql.SQLContext(spark.sparkContext)
val rdd = spark.sparkContext.parallelize(allmetadata)
rdd.toDF().write.format("org.apache.hudi").
option("hoodie.datasource.write.recordkey.field", "partitionPath,fileName,colName").
option("hoodie.datasource.write.precombine.field", "colName").
option("hoodie.table.name", "test.metadata_test").
option("hoodie.datasource.write.operation", "insert").
option("hoodie.parquet.small.file.limit", "107374182400").
option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").
mode("Append").
save(destPath) |
Querying Parquet metadata table
Code Block | ||
---|---|---|
| ||
import System.nanoTime
def profile[R](code: => R, t: Long = nanoTime) = (code, (((nanoTime - t)/1000)).toInt)
val parquetBasePath = new Path("hdfs://tables/my_metadata_table1/default/d95d3fbe-fe66-48fc-8bdc-7db923496916-0_0-55-98_20210327202848.parquet")
val fullSchema = ParquetUtils.readAvroSchema(spark.sparkContext.hadoopConfiguration, parquetBasePath)
val schema = HoodieAvroUtils.generateProjectionSchema(fullSchema,
java.util.Arrays.asList("partitionPath", "fileName", "colName", "minVal", "maxVal"))
val keys = new scala.collection.mutable.ListBuffer[String]()
val (count, time) = profile {
AvroReadSupport.setAvroReadSchema(spark.sparkContext.hadoopConfiguration, schema)
AvroReadSupport.setRequestedProjection(spark.sparkContext.hadoopConfiguration, schema)
val filterToUse = FilterCompat.get(
FilterApi.or(
FilterApi.or(
FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("BASE.city_id")),
FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("ACTIONS.actions.array.source.type"))
),
FilterApi.or(
FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("ACTIONS.actions.array.contactType")),
FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("ACTIONS.actions.array.destination.type"))
)
)
)
val parquetReader = AvroParquetReader.builder(parquetBasePath).
withConf(spark.sparkContext.hadoopConfiguration).
withFilter(filterToUse).
build()
var record1 : GenericRecord = parquetReader.read().asInstanceOf[GenericRecord]
var count: Long = 1
while (record1 != null) {
record1 = parquetReader.read().asInstanceOf[GenericRecord]
count+=1
}
count
}
"" + (time/1000) + " ms to READ column info from parquet " + count
|
Generating HFile metadata table
Code Block | ||
---|---|---|
| ||
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.Properties
val tableName=""test.metadata_test_hfile"
val destPath="hdfs://tables/my_metadata_table_hfile"
val props = new Properties()
props.put("hoodie.table.name", tableName)
props.put("hoodie.table.base.file.format", "HFILE")
HoodieTableMetaClient.initTableAndGetMetaClient(spark.sparkContext.hadoopConfiguration, destPath, props)
import java.text.DecimalFormat
import org.apache.hudi.common.table.HoodieSnapshotMetadataClient
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.stream.Collectors
import java.util.Objects
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData
import org.apache.parquet.column.statistics.Statistics
import scala.collection.JavaConverters._
import spark.implicits._
val basePath = "hdfs://tables/my_table1"
case class FileStats(val minVal: String, val maxVal: String)
case class ColumnFileStats(val partitionPath: String, val fileName: String, val colName: String, val minVal: String, val maxVal: String)
val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath)
val snapshotClient = new HoodieSnapshotMetadataClient(metaClient)
val partition="2021/02/03"
val files = snapshotClient.getLatestSnapshotFiles(partition).collect(Collectors.toList()).asScala.toSeq
val allmetadata = files.flatMap(filePath => {
val blocks = ParquetFileReader.readFooter(spark.sparkContext.hadoopConfiguration, filePath).getBlocks().asScala
blocks.flatMap(b => b.getColumns().asScala.
map(col => (col.getPath().toDotString(),
FileStats(Objects.toString(col.getStatistics().minAsString()),
Objects.toString(col.getStatistics().maxAsString()))))).toSeq.
groupBy(x => x._1).mapValues(v => v.map(vv => vv._2).toSeq).
mapValues(value => FileStats(value.map(_.minVal).min, value.map(_.maxVal).max)).toSeq.
map(x => ColumnFileStats(partition, filePath.getName(), x._1, x._2.minVal, x._2.maxVal))
})
val sqlContext = new org.apache.spark.sql.SQLContext(spark.sparkContext)
val rdd = spark.sparkContext.parallelize(allmetadata)
rdd.toDF().write.format("org.apache.hudi").
option("hoodie.datasource.write.recordkey.field", "partitionPath,fileName,colName").
option("hoodie.datasource.write.precombine.field", "colName").
option("hoodie.table.name", tableName).
option("hoodie.datasource.write.operation", "bulk_insert").
option("hoodie.parquet.small.file.limit", "1073741824").
option("hoodie.parquet.max.file.size", "1073741824").
option("hoodie.bulkinsert.shuffle.parallelism", "1").
option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").
mode("Append").
save(destPath)
}) |
Querying HFile Metadata
Note that HFile format key could be stored differently. This is a simple example to measure time taken to read 'N' keys. N could be different for different key formats. (For example, if we choose key = partitionPath and value = range_for_all_files_all_columns_in_the_partition, then we only have to read few keys compared to key = partition+file_name, value = range_for_all_columns_in_file)
Code Block | ||
---|---|---|
| ||
import System.nanoTime
def profile[R](code: => R, t: Long = nanoTime) = (code, (((nanoTime - t)/1000)).toInt)
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.hadoop.fs.Path
import scala.collection.JavaConverters._
val cacheConfig = new CacheConfig(spark.sparkContext.hadoopConfiguration);
cacheConfig.setCacheDataInL1(false);
val hfilePath = new Path("hdfs://tables/my_metadata_table_hfile/default/53dd4e23-012c-4e3e-91fc-9d5ff6a3bf83-0_0-48-91_20210327201656.hfile")
val reader = new HoodieHFileReader(spark.sparkContext.hadoopConfiguration, hfilePath, cacheConfig);
val keys = reader.readAllRecords().asScala.map(x => x.getFirst())
val keyRange = keys.slice(95000, 100000) // pick desired number of keys
var totalTime: Long = 0
var totalRecords: Long = 0
val reader = new HoodieHFileReader(spark.sparkContext.hadoopConfiguration, hfilePath, cacheConfig)
(keyRange).map { k =>
val (record, time) = profile { reader.getRecordByKey(k) }
totalTime += time
if (record.isPresent()) { totalRecords += 1L}
}
"" + (totalTime/1000) + " ms to LOOKUP HFile, #records: " + totalRecords |
...
Test Plan
<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>
...