You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Current »


Proposers

Approvers

  • @<approver1 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
  • @<approver2 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
  • ...

Status

Current state


Current State

UNDER DISCUSSION

(tick)

IN PROGRESS


ABANDONED


COMPLETED


INACTIVE


Discussion thread: NA

JIRA Unable to render Jira issues macro, execution error.

Released: <Hudi Version>


Abstract


Currently, Flink writer pipeline has a bucket assigner stage to index records in the state and a writer stage to actually flushing the data to the disk. RFC-24
For a very large dataflow, the checkpoint could consume a significant amount of time and might lead to checkpoint timeout if we meet a slow hdfs datanode. So we need a pipeline to write data in a real streaming fashion.

Background


We will share the hash mechanism of RFC-29, to make Flink and Spark using the same hash mechanism.
This feature was designed for Flink Merge On Read table and upsert mode only.

Implementation

Bootstrap
Each partition should have the same number of buckets, so the partition+bucketID was a unique key for all the filegroups in one table. When bootstrapping, we can scan the existing filesystem to build a Map(partition+bucketID, FileID) mapping, and append the data into the existing filegroup.

Generate the bucketID with the pre-defined hash function and shuffle it by bucketID
In order to make sure all the data of one bucket are located in the same parallelism, we need to shuffle the records by bucketID, then one filegroup will only be touched by one writer.

Assign FileID based on bucketID on StreamWriteFunction
Because we already have the Map(partition+bucketID, FileID) mapping, so we know the fileID of the record. If it's new, then create a new file group. We can tagLocation() here.

Create HoodieAppendHandle for each FileID
For each checkpoint, we create a new set of HoodieAppendHandle to actually write the record. Each fileID will have one HoodieAppendHandle. The fileID and Handle mapping will be stored at HoodieFlinkClient.

Buffering record in HoodieAppendHandle
In the Flink StreamWriteFunction.processElement(T value) method, we can buffer the HoodieRecord in the HoodieAppendHandle.write(hoodieRecord).

Flush to disk with the existing memory monitoring
HoodieAppendHandle has a memory control mechanism to flush data to the disk, so we don't need to worry about OOM.

Close the handle to get the commit status when the checkpoint is triggered
Once the checkpoint is triggered, we call HoodieAppendHandle.close() to flush all the handles and collect all the WriteStatus.

Performance Evaluation


Compared with state index and upsert(List<HoodieRecord>), the checkpoint time period was dropped from minutes to less than one minute for >2 millions qps dataflow.

Rollout/Adoption Plan


No impact on the existing pipeline cause it's a new indexing mechanism.

Test Plan


Unit test
Integration test
User participation is welcomed.








  • No labels