Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Balaji Varadarajan (vbalaji)
  • Udit Mehrotra (umehrot)

Approvers

  • Vinoth Chandar @vinoth  : APPROVED
  • Nishith Agarwal @nagarwal : REQUESTED_INFO...

Status

Current state

Status
colourYellow
titleIn Progress

...

  1. User stops all write operations on original dataset.
  2. User initiates this new bootstrap one time either through delta-streamer or through a standalone tool. Users provide the following information as part of the bootstrap:
    1. Original (non-hudi) dataset base location
    2. Columns to be used for generating hudi keys.
    3. Parallelism for running this bootstrap
    4. New Hudi Dataset location 
  3. Hudi bootstrap scans partitions and files in the original root location “/user/hive/warehouse/fact_events”  and performs the following operations :
    1. Creates similar hudi partitions in the new dataset location. In the above example, there will be day based partitions created under “/user/hive/warehouse/fact_events_hudi”
    2. Using Spark parallelism, generates unique file ID and uses it to generate a hudi skeleton parquet file for each original parquet file. A special commit timestamp called “BOOTSTRAP_COMMIT” is used. For the remainder of this document, let us imagine BOOTSTRAP_COMMIT having the timestamp “001”000000000. For example, if an original parquet file is at the path /user/hive/warehouse/fact_events/year=2015/month=12/day=31/file1.parquet. Let the unique file id getting generated is h1, then the skeletor skeleton file corresponding to the above parquet file will be stored at path /user/hive/warehouse/fact_events_hudi/year=2015/month=12/day=31/h1_1-0-1_001000000000.parquet.
    3. Generates a special bootstrap index which maps each newly generated hudi skeleton file id to its corresponding original parquet file.
    4. Atomically commits the bootstrap operation using same hudi timeline state transitions. Standard rollback will also be supported to rollback inflight and committed bootstrap.
  4. If hive syncing is enabled, creates a brand new hudi hive table pointing to the new location - “/user/hive/warehouse/fact_events_hudi”
  5. Subsequent write operations happen on the hudi dataset.

...

  • Let us say the commit time associated with this upsert operation is “C1”. It is given that C1 is greater than the “BOOTSTRAP_COMMIT” (000000000).
  • Assuming Bloom Index, index lookup happens directly on Hudi skeleton files.  Let’s say the hudi skeleton file with file id “h1” has all the records, 
  • In the coming description, “regular” hudi file means it is a hudi parquet file with per-record hudi metadata columns, original columns and bloom index in the single file. For Copy-On-Write table, the  writing phase identifies that the latest file-slice for the file Id “h1” is generated by bootstrap using special bootstrap commit time. It reads the original external file stored under original root location “/user/hive/warehouse/fact_events”. Hudi Merge Handle reads both this external file and the metadata-only hudi file parallelly, stitching the records together and merging them with incoming batch of records to create a “regular” hudi file with brand new version for the fileId “h1”.

Image RemovedImage Added

  • For Merge-On-Read table, ingestion would simply append to a delta log file and a subsequent compaction performs similar steps as Copy-On-Write table to generate a “regular” hudi file with brand new version for the fileId “h1”.

...

  • As with any Hudi datasets, the uniqueness constraint of record keys is expected for the dataset to be bootstrapped. Hence, care must be taken to select the columns in the original dataset to guarantee uniqueness. Otherwise, proper upsert for records corresponding to duplicate keys is not guaranteed.

Data Source Support

This section proposes a design for integrating Hudi Bootstrapped table with Spark DataSource, so that Copy-on-Write tables can be read using the Hudi data source using the following ways:

Code Block
val df = spark.read.format("hudi").load("s3://<bucket>/table1/")
val df = spark.read.format("hudi").load("s3://<bucket>/table1/partition1/")


Note: We can also accept a path pattern here instead, to maintain compatibility with the current behavior but for that we will have to implement our own handling of the patterns.

Proposal for COW snapshot queries

The idea here is to implement a new Spark Relation and Spark RDD that will be used for scanning and reading bootstrapped tables. The custom relation will implement PruneFilteredScan to allow for supporting filters pushdown and column pruning. For the RDD, each partition will be data file + optional skeleton file combination which will be sent to one task to perform the merge and return the results.

Following code skeleton is to provide a high-level outline of what we want to achieve here. API signatures may change as we set out to implement.

Code Block
1.  package org.apache.hudi.skeleton  
2.	  
3.	import org.apache.spark.rdd.RDD  
4.	import org.apache.spark.sql.{Row, SQLContext}  
5.	import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}  
6.	import org.apache.spark.sql.types.StructType  
7.	  
8.	case class HudiBootstrapTableState(files: List[HudiBootstrapSplit])  
9.	  
10.	case class HudiBootstrapSplit(dataFile: String,  
11.	                              skeletonFile: String)  
12.	  
13.	class HudiBootstrapRelation(val sqlContext: SQLContext,  
14.	                            val basePath: String,  
15.	                            val optParams: Map[String, String],  
16.	                            val userSchema: StructType)  
17.	                            extends BaseRelation with PrunedFilteredScan {  
18.	  
19.	  override def schema: StructType = ???  
20.	  
21.	  override def buildScan(requiredColumns: Array[String],  
22.	                         filters: Array[Filter]): RDD[Row] = {  
23.	    // Perform the following steps here:  
24.	    // 1. Perform file system listing to form HudiBootstrapTableState which would  
25.	    //    maintain a mapping of Hudi skeleton files to External data files  
26.	    //  
27.	    // 2. Form the HudiBootstrapRDD and return it  
28.	  
29.	    val tableState = HudiBootstrapTableState(List())  
30.	    new HudiBootstrapRDD(tableState, sqlContext.sparkSession).map(_.asInstanceOf[Row])  
31.	  }  
32.	}


1.	package org.apache.hudi.skeleton  
2.	  
3.	import org.apache.spark.{Partition, TaskContext}  
4.	import org.apache.spark.rdd.RDD  
5.	import org.apache.spark.sql.SparkSession  
6.	import org.apache.spark.sql.catalyst.InternalRow  
7.	  
8.	class HudiBootstrapRDD(table: HudiBootstrapTableState,                         
9.	                       spark: SparkSession)   
10.	                       extends RDD[InternalRow](spark.sparkContext, Nil) {  
11.	  
12.	  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {  
13.	    // This is the code that gets executed at each spark task. We will perform  
14.	    // the following tasks here:  
15.	    // - From the HudiBootstrapPartition, obtain the data and skeleton file paths  
16.	    // - If the skeleton file exists (bootstrapped partition), perform the merge  
17.	    //   and return a merged iterator  
18.	    // - If the skeleton file does not exist (non-bootstrapped partition), read  
19.	    //   only the data file and return an iterator  
20.	    // - For reading parquet files, build reader using ParquetFileFormat which  
21.	    //   returns an Iterator[InternalRow].
22.	    // - Merge the readers for skeleton and data files and return a single
23.	    //   Iterator[InternalRow]
24.	    // - Investigate and implement passing of filters and required schema down  
25.	    //   for pruning and filtering optimizations that ParquetFileFormat provides.  
26.	  }  
27.	  
28.	  override protected def getPartitions: Array[Partition] = {  
29.	    // Form the partitions i.e. HudiBootstrapPartition from HudiBootstrapTableState.  
30.	    // Each spark task would handle one partition. Here we can do one of the  
31.	    // following mappings:  
32.	    // - Map one HudiBootstrapSplit to one partition, so that each task would  
33.	    //   perform merging of just one split i.e. data file and skeleton  
34.	    // - Map multiple HudiBootstrapSplit to one partition, so that each task  
35.	    //   would perform merging of multiple splits i.e. multiple data/skeleton files  
36.	      
37.	    table.files.zipWithIndex.map(file =>   
38.	      HudiBootstrapPartition(file._1, file._2)).toArray  
39.	  }  
40.	}  
41.	  
42.	case class HudiBootstrapPartition(split: HudiBootstrapSplit,  
43.	                                  index: Int) extends Partition 


Advantages:

  • We do not have to make any changes in Spark code base.
  • Provides a way to control file listing logic to list down the skeleton files, and then map them to the corresponding external data file.
  • Provides control over what goes into each partition and computation logic for each partition, which is what we want to achieve here.
  • Same design can be further applied to Merge-on-Read tables.

Disadvantages:

  • File splitting would not be supported, which may have impact on the read performance. Each task would handle merging of one skeleton + data file only. But at the same time, we do not currently have a way to handle splits of skeleton + data parquet files, to be able to split them at exactly the same row offsets and then merge them later. Even with the InputFormat column stitching logic we would have to disable splitting of files, and each split will be mapped to one file. Thus, in that sense we would following a similar approach.

Proposal for COW incremental queries

For Incremental queries we would have to employ similar logic to re-design the IncrementalRelation currently implemented in Hudi code. We can probably use the same RDD implementation there, which we implement for the snapshot query case.

Rollout/Adoption Plan

  • This will be rolled out as an experimental feature in 0.5.1 
  • Hudi Writers and Readers do not need special configuration to identify tables using this new bootstrap mechanism. The presence of special bootstrap commit and bootstrap index will automatically trigger correct handling of these tables.

...