Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Approvers

  • TBD (This is a WIP RFC, Approvers to be added after it is ready to review)

...


Current State

Status
titleUnder Discussion

(tick)

Status
colourYellow
titleIn Progress


Status
colourRed
titleABANDONED


Status
colourGreen
titleCompleted

(tick)

Status
colourBlue
titleINactive


...

  • <What impact (if any) will there be on existing users?>
  • <If we are changing behavior how will we phase out the older behavior?>
  • <If we need special migration tools, describe them here.>
  • <When will we remove the existing behavior?>


Appendix

The below code can be run in spark-shell  (or jupyter notebook) to quickly iterate on different formats (TODO integrate with hudi metadata table writer code)

Generating Parquet metadata table

Code Block
titlegenerate range metadata in parquet format
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.Properties

val tableName="test.metadata_test_parquet"
val props = new Properties()
props.put("hoodie.table.name", tableName)
val destPath="hdfs://tables/my_metadata_table_parquet"
HoodieTableMetaClient.initTableAndGetMetaClient(spark.sparkContext.hadoopConfiguration, destPath, props)

import org.apache.hudi.common.table.HoodieSnapshotMetadataClient
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.stream.Collectors
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData
import org.apache.parquet.column.statistics.Statistics
import scala.collection.JavaConverters._
import spark.implicits._


case class FileStats(val minVal: String, val maxVal: String)

case class ColumnFileStats(val partitionPath: String, val fileName: String, val colName: String, val minVal: String, val maxVal: String)

val basePath = "hdfs://tables/my_table1"
val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath)
val snapshotClient = new HoodieSnapshotMetadataClient(metaClient)

val partition="2021/02/03"
val files = snapshotClient.getLatestSnapshotFiles(partition).collect(Collectors.toList()).asScala.toSeq


val allmetadata = files.flatMap(filePath => {
    val blocks = ParquetFileReader.readFooter(spark.sparkContext.hadoopConfiguration, filePath).getBlocks().asScala

    blocks.flatMap(b => b.getColumns().asScala.
               map(col => (col.getPath().toDotString(), 
                           FileStats(Objects.toString(col.getStatistics().minAsString()),
                                     Objects.toString(col.getStatistics().maxAsString()))))).toSeq.
               groupBy(x => x._1).mapValues(v => v.map(vv => vv._2).toSeq).
               mapValues(value => FileStats(value.map(_.minVal).min, value.map(_.maxVal).max)).toSeq.
               map(x => ColumnFileStats(partition, filePath.getName(), x._1, x._2.minVal, x._2.maxVal))
})

val sqlContext = new org.apache.spark.sql.SQLContext(spark.sparkContext) 
val rdd = spark.sparkContext.parallelize(allmetadata)
rdd.toDF().write.format("org.apache.hudi").
    option("hoodie.datasource.write.recordkey.field", "partitionPath,fileName,colName").
    option("hoodie.datasource.write.precombine.field", "colName").
    option("hoodie.table.name", "test.metadata_test").
    option("hoodie.datasource.write.operation", "insert").
    option("hoodie.parquet.small.file.limit", "107374182400").
    option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").
    mode("Append").
    save(destPath)



Querying Parquet metadata table

Code Block
titleQuerying parquet metadata
import System.nanoTime
def profile[R](code: => R, t: Long = nanoTime) = (code, (((nanoTime - t)/1000)).toInt)

val parquetBasePath = new Path("hdfs://tables/my_metadata_table1/default/d95d3fbe-fe66-48fc-8bdc-7db923496916-0_0-55-98_20210327202848.parquet")
val fullSchema = ParquetUtils.readAvroSchema(spark.sparkContext.hadoopConfiguration, parquetBasePath)
val schema = HoodieAvroUtils.generateProjectionSchema(fullSchema,
    java.util.Arrays.asList("partitionPath", "fileName", "colName", "minVal", "maxVal"))

val  keys = new scala.collection.mutable.ListBuffer[String]()

val (count, time) = profile {
    
    AvroReadSupport.setAvroReadSchema(spark.sparkContext.hadoopConfiguration, schema)
    AvroReadSupport.setRequestedProjection(spark.sparkContext.hadoopConfiguration, schema)
    val filterToUse = FilterCompat.get(
        FilterApi.or(
            FilterApi.or(
                  FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("BASE.city_id")),
                  FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("ACTIONS.actions.array.source.type"))
            ),
            FilterApi.or(
                FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("ACTIONS.actions.array.contactType")),
                FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("ACTIONS.actions.array.destination.type"))
            )
                
        )
    )
    val parquetReader = AvroParquetReader.builder(parquetBasePath).
       withConf(spark.sparkContext.hadoopConfiguration).
       withFilter(filterToUse).
       build()

    var record1 : GenericRecord = parquetReader.read().asInstanceOf[GenericRecord]  
    var count: Long = 1
    while (record1 != null) {
        record1 = parquetReader.read().asInstanceOf[GenericRecord] 
        count+=1
    }
    count
}
"" + (time/1000) + " ms to READ column info from parquet " + count


Generating HFile metadata table


Code Block
titlegenerate HFile metadata format
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.Properties

val tableName=""test.metadata_test_hfile"
val destPath="hdfs://tables/my_metadata_table_hfile"
val props = new Properties()
props.put("hoodie.table.name", tableName)
props.put("hoodie.table.base.file.format", "HFILE")
HoodieTableMetaClient.initTableAndGetMetaClient(spark.sparkContext.hadoopConfiguration, destPath, props)

import java.text.DecimalFormat
import org.apache.hudi.common.table.HoodieSnapshotMetadataClient
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.stream.Collectors
import java.util.Objects
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData
import org.apache.parquet.column.statistics.Statistics
import scala.collection.JavaConverters._
import spark.implicits._


val basePath = "hdfs://tables/my_table1"

case class FileStats(val minVal: String, val maxVal: String)

case class ColumnFileStats(val partitionPath: String, val fileName: String, val colName: String, val minVal: String, val maxVal: String)
    
val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath)
val snapshotClient = new HoodieSnapshotMetadataClient(metaClient)

val partition="2021/02/03"
val files = snapshotClient.getLatestSnapshotFiles(partition).collect(Collectors.toList()).asScala.toSeq

val allmetadata = files.flatMap(filePath => {
    val blocks = ParquetFileReader.readFooter(spark.sparkContext.hadoopConfiguration, filePath).getBlocks().asScala

    blocks.flatMap(b => b.getColumns().asScala.
               map(col => (col.getPath().toDotString(), 
                           FileStats(Objects.toString(col.getStatistics().minAsString()),
                                     Objects.toString(col.getStatistics().maxAsString()))))).toSeq.
               groupBy(x => x._1).mapValues(v => v.map(vv => vv._2).toSeq).
               mapValues(value => FileStats(value.map(_.minVal).min, value.map(_.maxVal).max)).toSeq.
               map(x => ColumnFileStats(partition, filePath.getName(), x._1, x._2.minVal, x._2.maxVal))
})

val sqlContext = new org.apache.spark.sql.SQLContext(spark.sparkContext) 
val rdd = spark.sparkContext.parallelize(allmetadata)
rdd.toDF().write.format("org.apache.hudi").
    option("hoodie.datasource.write.recordkey.field", "partitionPath,fileName,colName").
    option("hoodie.datasource.write.precombine.field", "colName").
    option("hoodie.table.name", tableName).
    option("hoodie.datasource.write.operation", "bulk_insert").
    option("hoodie.parquet.small.file.limit", "1073741824").
    option("hoodie.parquet.max.file.size", "1073741824").
    option("hoodie.bulkinsert.shuffle.parallelism", "1").
    option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").
    mode("Append").
    save(destPath)
})


Querying HFile Metadata

Note that HFile format key could be stored differently. This is a simple example to measure time taken to read 'N' keys. N could be different for different key formats. (For example, if we choose key = partitionPath and value = range_for_all_files_all_columns_in_the_partition, then we only have to read few keys compared to key = partition+file_name, value = range_for_all_columns_in_file) 

Code Block
titlequerying hfile range metadata
import System.nanoTime
def profile[R](code: => R, t: Long = nanoTime) = (code, (((nanoTime - t)/1000)).toInt)

import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.hadoop.fs.Path
import scala.collection.JavaConverters._


val cacheConfig = new CacheConfig(spark.sparkContext.hadoopConfiguration);
cacheConfig.setCacheDataInL1(false);

val hfilePath = new Path("hdfs://tables/my_metadata_table_hfile/default/53dd4e23-012c-4e3e-91fc-9d5ff6a3bf83-0_0-48-91_20210327201656.hfile")
val reader = new HoodieHFileReader(spark.sparkContext.hadoopConfiguration, hfilePath, cacheConfig);

val keys = reader.readAllRecords().asScala.map(x => x.getFirst())
val keyRange = keys.slice(95000, 100000) // pick desired number of keys

var totalTime: Long = 0
var totalRecords: Long = 0
val reader = new HoodieHFileReader(spark.sparkContext.hadoopConfiguration, hfilePath, cacheConfig)
(keyRange).map { k =>
    val (record, time) = profile { reader.getRecordByKey(k) }
    totalTime += time
    if (record.isPresent()) { totalRecords += 1L}
}
"" + (totalTime/1000) + " ms to LOOKUP HFile, #records: " + totalRecords


Test Plan

<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>

...