...
Approvers
- TBD (This is a WIP RFC, Approvers to be added after it is ready to review)
...
| Current State |
---|
| ![(tick)](/confluence/s/34gb19/8804/10mvnxf/_/images/icons/emoticons/check.svg) |
|
Status |
---|
colour | Yellow |
---|
title | In Progress |
---|
|
|
|
|
|
Status |
---|
colour | Green |
---|
title | Completed |
---|
|
| ![(tick)](/confluence/s/34gb19/8804/10mvnxf/_/images/icons/emoticons/check.svg) |
|
|
...
- <What impact (if any) will there be on existing users?>
- <If we are changing behavior how will we phase out the older behavior?>
- <If we need special migration tools, describe them here.>
- <When will we remove the existing behavior?>
Appendix
The below code can be run in spark-shell (or jupyter notebook) to quickly iterate on different formats (TODO integrate with hudi metadata table writer code)
Generating Parquet metadata table
Code Block |
---|
title | generate range metadata in parquet format |
---|
|
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.Properties
val tableName="test.metadata_test_parquet"
val props = new Properties()
props.put("hoodie.table.name", tableName)
val destPath="hdfs://tables/my_metadata_table_parquet"
HoodieTableMetaClient.initTableAndGetMetaClient(spark.sparkContext.hadoopConfiguration, destPath, props)
import org.apache.hudi.common.table.HoodieSnapshotMetadataClient
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.stream.Collectors
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData
import org.apache.parquet.column.statistics.Statistics
import scala.collection.JavaConverters._
import spark.implicits._
case class FileStats(val minVal: String, val maxVal: String)
case class ColumnFileStats(val partitionPath: String, val fileName: String, val colName: String, val minVal: String, val maxVal: String)
val basePath = "hdfs://tables/my_table1"
val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath)
val snapshotClient = new HoodieSnapshotMetadataClient(metaClient)
val partition="2021/02/03"
val files = snapshotClient.getLatestSnapshotFiles(partition).collect(Collectors.toList()).asScala.toSeq
val allmetadata = files.flatMap(filePath => {
val blocks = ParquetFileReader.readFooter(spark.sparkContext.hadoopConfiguration, filePath).getBlocks().asScala
blocks.flatMap(b => b.getColumns().asScala.
map(col => (col.getPath().toDotString(),
FileStats(Objects.toString(col.getStatistics().minAsString()),
Objects.toString(col.getStatistics().maxAsString()))))).toSeq.
groupBy(x => x._1).mapValues(v => v.map(vv => vv._2).toSeq).
mapValues(value => FileStats(value.map(_.minVal).min, value.map(_.maxVal).max)).toSeq.
map(x => ColumnFileStats(partition, filePath.getName(), x._1, x._2.minVal, x._2.maxVal))
})
val sqlContext = new org.apache.spark.sql.SQLContext(spark.sparkContext)
val rdd = spark.sparkContext.parallelize(allmetadata)
rdd.toDF().write.format("org.apache.hudi").
option("hoodie.datasource.write.recordkey.field", "partitionPath,fileName,colName").
option("hoodie.datasource.write.precombine.field", "colName").
option("hoodie.table.name", "test.metadata_test").
option("hoodie.datasource.write.operation", "insert").
option("hoodie.parquet.small.file.limit", "107374182400").
option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").
mode("Append").
save(destPath) |
Querying Parquet metadata table
Code Block |
---|
title | Querying parquet metadata |
---|
|
import System.nanoTime
def profile[R](code: => R, t: Long = nanoTime) = (code, (((nanoTime - t)/1000)).toInt)
val parquetBasePath = new Path("hdfs://tables/my_metadata_table1/default/d95d3fbe-fe66-48fc-8bdc-7db923496916-0_0-55-98_20210327202848.parquet")
val fullSchema = ParquetUtils.readAvroSchema(spark.sparkContext.hadoopConfiguration, parquetBasePath)
val schema = HoodieAvroUtils.generateProjectionSchema(fullSchema,
java.util.Arrays.asList("partitionPath", "fileName", "colName", "minVal", "maxVal"))
val keys = new scala.collection.mutable.ListBuffer[String]()
val (count, time) = profile {
AvroReadSupport.setAvroReadSchema(spark.sparkContext.hadoopConfiguration, schema)
AvroReadSupport.setRequestedProjection(spark.sparkContext.hadoopConfiguration, schema)
val filterToUse = FilterCompat.get(
FilterApi.or(
FilterApi.or(
FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("BASE.city_id")),
FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("ACTIONS.actions.array.source.type"))
),
FilterApi.or(
FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("ACTIONS.actions.array.contactType")),
FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("ACTIONS.actions.array.destination.type"))
)
)
)
val parquetReader = AvroParquetReader.builder(parquetBasePath).
withConf(spark.sparkContext.hadoopConfiguration).
withFilter(filterToUse).
build()
var record1 : GenericRecord = parquetReader.read().asInstanceOf[GenericRecord]
var count: Long = 1
while (record1 != null) {
record1 = parquetReader.read().asInstanceOf[GenericRecord]
count+=1
}
count
}
"" + (time/1000) + " ms to READ column info from parquet " + count
|
Generating HFile metadata table
Code Block |
---|
title | generate HFile metadata format |
---|
|
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.Properties
val tableName=""test.metadata_test_hfile"
val destPath="hdfs://tables/my_metadata_table_hfile"
val props = new Properties()
props.put("hoodie.table.name", tableName)
props.put("hoodie.table.base.file.format", "HFILE")
HoodieTableMetaClient.initTableAndGetMetaClient(spark.sparkContext.hadoopConfiguration, destPath, props)
import java.text.DecimalFormat
import org.apache.hudi.common.table.HoodieSnapshotMetadataClient
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.stream.Collectors
import java.util.Objects
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData
import org.apache.parquet.column.statistics.Statistics
import scala.collection.JavaConverters._
import spark.implicits._
val basePath = "hdfs://tables/my_table1"
case class FileStats(val minVal: String, val maxVal: String)
case class ColumnFileStats(val partitionPath: String, val fileName: String, val colName: String, val minVal: String, val maxVal: String)
val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath)
val snapshotClient = new HoodieSnapshotMetadataClient(metaClient)
val partition="2021/02/03"
val files = snapshotClient.getLatestSnapshotFiles(partition).collect(Collectors.toList()).asScala.toSeq
val allmetadata = files.flatMap(filePath => {
val blocks = ParquetFileReader.readFooter(spark.sparkContext.hadoopConfiguration, filePath).getBlocks().asScala
blocks.flatMap(b => b.getColumns().asScala.
map(col => (col.getPath().toDotString(),
FileStats(Objects.toString(col.getStatistics().minAsString()),
Objects.toString(col.getStatistics().maxAsString()))))).toSeq.
groupBy(x => x._1).mapValues(v => v.map(vv => vv._2).toSeq).
mapValues(value => FileStats(value.map(_.minVal).min, value.map(_.maxVal).max)).toSeq.
map(x => ColumnFileStats(partition, filePath.getName(), x._1, x._2.minVal, x._2.maxVal))
})
val sqlContext = new org.apache.spark.sql.SQLContext(spark.sparkContext)
val rdd = spark.sparkContext.parallelize(allmetadata)
rdd.toDF().write.format("org.apache.hudi").
option("hoodie.datasource.write.recordkey.field", "partitionPath,fileName,colName").
option("hoodie.datasource.write.precombine.field", "colName").
option("hoodie.table.name", tableName).
option("hoodie.datasource.write.operation", "bulk_insert").
option("hoodie.parquet.small.file.limit", "1073741824").
option("hoodie.parquet.max.file.size", "1073741824").
option("hoodie.bulkinsert.shuffle.parallelism", "1").
option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").
mode("Append").
save(destPath)
}) |
Querying HFile Metadata
Note that HFile format key could be stored differently. This is a simple example to measure time taken to read 'N' keys. N could be different for different key formats. (For example, if we choose key = partitionPath and value = range_for_all_files_all_columns_in_the_partition, then we only have to read few keys compared to key = partition+file_name, value = range_for_all_columns_in_file)
Code Block |
---|
title | querying hfile range metadata |
---|
|
import System.nanoTime
def profile[R](code: => R, t: Long = nanoTime) = (code, (((nanoTime - t)/1000)).toInt)
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.hadoop.fs.Path
import scala.collection.JavaConverters._
val cacheConfig = new CacheConfig(spark.sparkContext.hadoopConfiguration);
cacheConfig.setCacheDataInL1(false);
val hfilePath = new Path("hdfs://tables/my_metadata_table_hfile/default/53dd4e23-012c-4e3e-91fc-9d5ff6a3bf83-0_0-48-91_20210327201656.hfile")
val reader = new HoodieHFileReader(spark.sparkContext.hadoopConfiguration, hfilePath, cacheConfig);
val keys = reader.readAllRecords().asScala.map(x => x.getFirst())
val keyRange = keys.slice(95000, 100000) // pick desired number of keys
var totalTime: Long = 0
var totalRecords: Long = 0
val reader = new HoodieHFileReader(spark.sparkContext.hadoopConfiguration, hfilePath, cacheConfig)
(keyRange).map { k =>
val (record, time) = profile { reader.getRecordByKey(k) }
totalTime += time
if (record.isPresent()) { totalRecords += 1L}
}
"" + (totalTime/1000) + " ms to LOOKUP HFile, #records: " + totalRecords |
Test Plan
<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>
...