Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Balaji Varadarajan (vbalaji)
  • Udit Mehrotra (umehrot)

Approvers

  • Vinoth Chandar @vinoth  : APPROVED
  • Nishith Agarwal @nagarwal : REQUESTED_INFO...

Status

Current state:"Under Discussion" 

Status
colourYellow
titleIn Progress

Discussion thread: here

JIRA: here

...

With the above setup, let us imagine that the user is using this new bootstrap mechanism to bootstrap this table to a new hudi dataset “fact_events_hudi” located at “/user/hive/warehouse/fact_events_hudi”

  1. User stops all write operations on original dataset.
  2. User initiates this new bootstrap one time either through delta-streamer or through a standalone tool. Users provide the following information as part of the bootstrap:
    1. Original (non-hudi) dataset base location
    2. Columns to be used for generating hudi keys.
    3. Parallelism for running this bootstrap
    4. New Hudi Dataset location 
  3. Hudi bootstrap scans partitions and files in the original root location “/user/hive/warehouse/fact_events”  and performs the following operations :
    1. Creates similar hudi partitions in the new dataset location. In the above example, there will be day based partitions created under “/user/hive/warehouse/fact_events_hudi”
    2. Using Spark parallelism, generates unique file ID and uses it to generate a hudi skeleton parquet file for each original parquet file. A special commit timestamp called “BOOTSTRAP_COMMIT” is used. For the remainder of this document, let us imagine BOOTSTRAP_COMMIT having the timestamp “001”000000000. For example, if an original parquet file is at the path /user/hive/warehouse/fact_events/year=2015/month=12/day=31/file1.parquet. Let the unique file id getting generated is h1, then the skeletor skeleton file corresponding to the above parquet file will be stored at path /user/hive/warehouse/fact_events_hudi/year=2015/month=12/day=31/h1_1-0-1_001000000000.parquet.
    3. Generates a special bootstrap index which maps each newly generated hudi skeleton file id to its corresponding original parquet file.
    4. Atomically commits the bootstrap operation using same hudi timeline state transitions. Standard rollback will also be supported to rollback inflight and committed bootstrap.
  4. If hive syncing is enabled, creates a brand new hudi hive table pointing to the new location - “/user/hive/warehouse/fact_events_hudi”
  5. Subsequent write operations happen on the hudi dataset.

Bootstrap Index:

The purpose of this index is to map the Hudi skeleton file with its associated external parquet-file containing original data columns. This information is part of Hudi’s file-system view and is  used when generating file slices. In this respect, there is a similarity between compaction plan and bootstrap index. But unlike compaction plan, bootstrap index could be larger in size for large historical tables. Hence, a proper storage format needs to be selected to be read and write efficient.

...

From the concepts page, A “file slice” refers to one complete snapshot of a logical hudi file. It contains one base file and one or more delta files. Conceptually, we encapsulate the bootstrap index information at file-slice (data-file) level. So a file-slice would be able to provide information about the external location where the original columns are residing.

With this modelIn Hudi, we have an implementation abstraction call file system view which maps physical files to file-slices. This abstraction will also annotate file-slices with bootstrap index entries (skeleton-file to external-file) so that higher layers can handle external files in a consistent way.

With this model, if we need to if we need to update a batch of records “1-K” in old partitions for the newly bootstrapped table “fact_events_hudi”, the following steps are performed,

  • Let us say the commit time associated with this upsert operation is “C1”. It is given that C1 is greater than the “BOOTSTRAP_COMMIT” (001000000000).
  • Assuming Bloom Index, index lookup happens directly on Hudi skeleton files.  Let’s say the hudi skeleton file with file id “h1” has all the records, 
  • In the coming description, “regular” hudi file means it is a hudi parquet file with per-record hudi metadata columns, original columns and bloom index in the single file. For Copy-On-Write table, the  writing phase identifies that the latest file-slice for the file Id “h1” is generated by bootstrap using special bootstrap commit time. It reads the original external file stored under original root location “/user/hive/warehouse/fact_events”. Hudi Merge Handle reads both this external file and the metadata-only hudi file parallelly, stitching the records together and merging them with incoming batch of records to create a “regular” hudi file with brand new version for the fileId “h1”.

Image RemovedImage Added

  • For Merge-On-Read table, ingestion would simply append to a delta log file and a subsequent compaction performs similar steps as Copy-On-Write table to generate a “regular” hudi file with brand new version for the fileId “h1”.

Hudi implements custom input formats to integrate with query engines. These existing custom input formats will recognize special bootstrap commit and performs column stitching between hudi record-level metadata fields in the skeleton hudi file and other columns present in external parquet file to provide same views as existing hudi tables. Note that only projected columns required by the query will be read from the physical parquet files. Please see below for a pictorial representation of how query engine integration is done

Requirements

  • As with any Hudi datasets, the uniqueness constraint of record keys is expected for the dataset to be bootstrapped. Hence, care must be taken to select the columns in the original dataset to guarantee uniqueness. Otherwise, proper upsert for records corresponding to duplicate keys is not guaranteed.

Data Source Support

This section proposes a design for integrating Hudi Bootstrapped table with Spark DataSource, so that Copy-on-Write tables can be read using the Hudi data source using the following ways:

Code Block
val df = spark.read.format("hudi").load("s3://<bucket>/table1/")
val df = spark.read.format("hudi").load("s3://<bucket>/table1/partition1/")


Note: We can also accept a path pattern here instead, to maintain compatibility with the current behavior but for that we will have to implement our own handling of the patterns.

Proposal for COW snapshot queries

The idea here is to implement a new Spark Relation and Spark RDD that will be used for scanning and reading bootstrapped tables. The custom relation will implement PruneFilteredScan to allow for supporting filters pushdown and column pruning. For the RDD, each partition will be data file + optional skeleton file combination which will be sent to one task to perform the merge and return the results.

Following code skeleton is to provide a high-level outline of what we want to achieve here. API signatures may change as we set out to implement.

Code Block
1.  package org.apache.hudi.skeleton  
2.	  
3.	import org.apache.spark.rdd.RDD  
4.	import org.apache.spark.sql.{Row, SQLContext}  
5.	import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}  
6.	import org.apache.spark.sql.types.StructType  
7.	  
8.	case class HudiBootstrapTableState(files: List[HudiBootstrapSplit])  
9.	  
10.	case class HudiBootstrapSplit(dataFile: String,  
11.	                              skeletonFile: String)  
12.	  
13.	class HudiBootstrapRelation(val sqlContext: SQLContext,  
14.	                            val basePath: String,  
15.	                            val optParams: Map[String, String],  
16.	                            val userSchema: StructType)  
17.	                            extends BaseRelation with PrunedFilteredScan {  
18.	  
19.	  override def schema: StructType = ???  
20.	  
21.	  override def buildScan(requiredColumns: Array[String],  
22.	                         filters: Array[Filter]): RDD[Row] = {  
23.	    // Perform the following steps here:  
24.	    // 1. Perform file system listing to form HudiBootstrapTableState which would  
25.	    //    maintain a mapping of Hudi skeleton files to External data files  
26.	    //  
27.	    // 2. Form the HudiBootstrapRDD and return it  
28.	  
29.	    val tableState = HudiBootstrapTableState(List())  
30.	    new HudiBootstrapRDD(tableState, sqlContext.sparkSession).map(_.asInstanceOf[Row])  
31.	  }  
32.	}


1.	package org.apache.hudi.skeleton  
2.	  
3.	import org.apache.spark.{Partition, TaskContext}  
4.	import org.apache.spark.rdd.RDD  
5.	import org.apache.spark.sql.SparkSession  
6.	import org.apache.spark.sql.catalyst.InternalRow  
7.	  
8.	class HudiBootstrapRDD(table: HudiBootstrapTableState,                         
9.	                       spark: SparkSession)   
10.	                       extends RDD[InternalRow](spark.sparkContext, Nil) {  
11.	  
12.	  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {  
13.	    // This is the code that gets executed at each spark task. We will perform  
14.	    // the following tasks here:  
15.	    // - From the HudiBootstrapPartition, obtain the data and skeleton file paths  
16.	    // - If the skeleton file exists (bootstrapped partition), perform the merge  
17.	    //   and return a merged iterator  
18.	    // - If the skeleton file does not exist (non-bootstrapped partition), read  
19.	    //   only the data file and return an iterator  
20.	    // - For reading parquet files, build reader using ParquetFileFormat which  
21.	    //   returns an Iterator[InternalRow].
22.	    // - Merge the readers for skeleton and data files and return a single
23.	    //   Iterator[InternalRow]
24.	    // - Investigate and implement passing of filters and required schema down  
25.	    //   for pruning and filtering optimizations that ParquetFileFormat provides.  
26.	  }  
27.	  
28.	  override protected def getPartitions: Array[Partition] = {  
29.	    // Form the partitions i.e. HudiBootstrapPartition from HudiBootstrapTableState.  
30.	    // Each spark task would handle one partition. Here we can do one of the  
31.	    // following mappings:  
32.	    // - Map one HudiBootstrapSplit to one partition, so that each task would  
33.	    //   perform merging of just one split i.e. data file and skeleton  
34.	    // - Map multiple HudiBootstrapSplit to one partition, so that each task  
35.	    //   would perform merging of multiple splits i.e. multiple data/skeleton files  
36.	      
37.	    table.files.zipWithIndex.map(file =>   
38.	      HudiBootstrapPartition(file._1, file._2)).toArray  
39.	  }  
40.	}  
41.	  
42.	case class HudiBootstrapPartition(split: HudiBootstrapSplit,  
43.	                                  index: Int) extends Partition 


Advantages:

  • We do not have to make any changes in Spark code base.
  • Provides a way to control file listing logic to list down the skeleton files, and then map them to the corresponding external data file.
  • Provides control over what goes into each partition and computation logic for each partition, which is what we want to achieve here.
  • Same design can be further applied to Merge-on-Read tables.

Disadvantages:

  • File splitting would not be supported, which may have impact on the read performance. Each task would handle merging of one skeleton + data file only. But at the same time, we do not currently have a way to handle splits of skeleton + data parquet files, to be able to split them at exactly the same row offsets and then merge them later. Even with the InputFormat column stitching logic we would have to disable splitting of files, and each split will be mapped to one file. Thus, in that sense we would following a similar approach.

Proposal for COW incremental queries

For Incremental queries we would have to employ similar logic to re-design the IncrementalRelation currently implemented in Hudi code. We can probably use the same RDD implementation there, which we implement for the snapshot query case.

Rollout/Adoption Plan

  • This will be rolled out as an experimental feature in 0.5.1 
  • Hudi Writers and Readers do not need special configuration to identify tables using this new bootstrap mechanism. The presence of special bootstrap commit and bootstrap index will automatically trigger correct handling of these tables.

...