Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Balaji Varadarajan (vbalaji)
  • Udit Mehrotra (umehrot)

Approvers

  • Vinoth Chandar @vinoth  : APPROVED
  • Nishith Agarwal @nagarwal : REQUESTED_INFO...

Status

Current state

Status
colourYellow
titleIn Progress

...

  • As with any Hudi datasets, the uniqueness constraint of record keys is expected for the dataset to be bootstrapped. Hence, care must be taken to select the columns in the original dataset to guarantee uniqueness. Otherwise, proper upsert for records corresponding to duplicate keys is not guaranteed.

Data Source Support

This section proposes a design for integrating Hudi Bootstrapped table with Spark DataSource, so that Copy-on-Write tables can be read using the Hudi data source using the following ways:

Code Block
val df = spark.read.format("hudi").load("s3://<bucket>/table1/")
val df = spark.read.format("hudi").load("s3://<bucket>/table1/partition1/")


Note: We can also accept a path pattern here instead, to maintain compatibility with the current behavior but for that we will have to implement our own handling of the patterns.

Proposal for COW snapshot queries

The idea here is to implement a new Spark Relation and Spark RDD that will be used for scanning and reading bootstrapped tables. The custom relation will implement PruneFilteredScan to allow for supporting filters pushdown and column pruning. For the RDD, each partition will be data file + skeleton file combination which will be sent to one task to perform the merge and return the results.

Following code skeleton is to provide a high-level outline of what we want to achieve here. API signatures may change as we set out to implement.

Code Block
1.  package org.apache.hudi.skeleton  
2.	  
3.	import org.apache.spark.rdd.RDD  
4.	import org.apache.spark.sql.{Row, SQLContext}  
5.	import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}  
6.	import org.apache.spark.sql.types.StructType  
7.	  
8.	case class HudiBootstrapTableState(files: List[HudiBootstrapSplit])  
9.	  
10.	case class HudiBootstrapSplit(dataFile: String,  
11.	                              skeletonFile: String)  
12.	  
13.	class HudiBootstrapRelation(val sqlContext: SQLContext,  
14.	                            val basePath: String,  
15.	                            val optParams: Map[String, String],  
16.	                            val userSchema: StructType)  
17.	                            extends BaseRelation with PrunedFilteredScan {  
18.	  
19.	  override def schema: StructType = ???  
20.	  
21.	  override def buildScan(requiredColumns: Array[String],  
22.	                         filters: Array[Filter]): RDD[Row] = {  
23.	    // Perform the following steps here:  
24.	    // 1. Perform file system listing to form HudiBootstrapTableState which would  
25.	    //    maintain a mapping of Hudi skeleton files to External data files  
26.	    //  
27.	    // 2. Form the HudiBootstrapRDD and return it  
28.	  
29.	    val tableState = HudiBootstrapTableState(List())  
30.	    new HudiBootstrapRDD(tableState, sqlContext.sparkSession).map(_.asInstanceOf[Row])  
31.	  }  
32.	}


1.	package org.apache.hudi.skeleton  
2.	  
3.	import org.apache.spark.{Partition, TaskContext}  
4.	import org.apache.spark.rdd.RDD  
5.	import org.apache.spark.sql.SparkSession  
6.	import org.apache.spark.sql.catalyst.InternalRow  
7.	  
8.	class HudiBootstrapRDD(table: HudiBootstrapTableState,                         
9.	                       spark: SparkSession)   
10.	                       extends RDD[InternalRow](spark.sparkContext, Nil) {  
11.	  
12.	  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {  
13.	    // This is the code that gets executed at each spark task. We will perform  
14.	    // the following tasks here:  
15.	    // - From the HudiBootstrapPartition, obtain the data and skeleton file paths  
16.	    // - If the skeleton file exists (bootstrapped partition), perform the merge  
17.	    //   and return a merged iterator  
18.	    // - If the skeleton file does not exist (non-bootstrapped partition), read  
19.	    //   only the data file and return an iterator  
20.	    // - For reading parquet files, build reader using ParquetFileFormat which  
21.	    //   returns an Iterator[InternalRow].
22.	    // - Merge the readers for skeleton and data files and return a single
23.	    //   Iterator[InternalRow]
24.	    // - Investigate and implement passing of filters and required schema down  
25.	    //   for pruning and filtering optimizations that ParquetFileFormat provides.  
26.	  }  
27.	  
28.	  override protected def getPartitions: Array[Partition] = {  
29.	    // Form the partitions i.e. HudiBootstrapPartition from HudiBootstrapTableState.  
30.	    // Each spark task would handle one partition. Here we can do one of the  
31.	    // following mappings:  
32.	    // - Map one HudiBootstrapSplit to one partition, so that each task would  
33.	    //   perform merging of just one split i.e. data file and skeleton  
34.	    // - Map multiple HudiBootstrapSplit to one partition, so that each task  
35.	    //   would perform merging of multiple splits i.e. multiple data/skeleton files  
36.	      
37.	    table.files.zipWithIndex.map(file =>   
38.	      HudiBootstrapPartition(file._1, file._2)).toArray  
39.	  }  
40.	}  
41.	  
42.	case class HudiBootstrapPartition(split: HudiBootstrapSplit,  
43.	                                  index: Int) extends Partition 


Advantages:

  • We do not have to make any changes in Spark code base.
  • Provides a way to control file listing logic to list down the skeleton files, and then map them to the corresponding external data file.
  • Provides control over what goes into each partition and computation logic for each partition, which is what we want to achieve here.
  • Same design can be further applied to Merge-on-Read tables.

Disadvantages:

  • File splitting would not be supported, which may have impact on the read performance. Each task would handle merging of one skeleton + data file only. But at the same time, we do not currently have a way to handle splits of skeleton + data parquet files, to be able to split them at exactly the same row offsets and then merge them later. Even with the InputFormat column stitching logic we would have to disable splitting of files, and each split will be mapped to one file. Thus, in that sense we would following a similar approach.

Proposal for COW incremental queries

For Incremental queries we would have to employ similar logic to re-design the IncrementalRelation currently implemented in Hudi code. We can probably use the same RDD implementation there, which we implement for the snapshot query case.

Rollout/Adoption Plan

  • This will be rolled out as an experimental feature in 0.5.1 
  • Hudi Writers and Readers do not need special configuration to identify tables using this new bootstrap mechanism. The presence of special bootstrap commit and bootstrap index will automatically trigger correct handling of these tables.

...