You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Current Pipeline

The current Flink writer pipeline is as follows:

It has some bottlenecks to adapter to high-throughput data set writing:

  1. The InstantGeneratorOperator is parallelism 1, which is a limit for high-throughput consumption; because all the split inputs drain to a single thread, the network IO would gains pressure too
  2. The WriteProcessOperator handles inputs by partition, that means, within each partition write process, the BUCKETs are written one by one, the FILE IO is limit to adapter to high-throughput inputs
  3. Currently we buffer the data by checkpoints, which is too hard to be robust for production, the checkpoint function is blocking and should not have IO operations. 
  4. The FlinkHoodieIndex is only valid for a per-job scope, it does not work for existing bootstrap data or for different Flink jobs

The Improvement Proposal

# STEP1: Remove the Single Parallelism Operator

To solve bottleneck 1.

Firstly, we can avoid the singleton task operator InstantGeneratorOperator by implementing an operator coordinator for the write operator, the coordinator always starts the checkpoint first, it starts a new commit on a new checkpointing

Work Flow

The write function would firstly buffer the data as a batch of HoodieRecords,

It flushes(write) the records batch when a Flink checkpoint starts. After a batch has been written successfully, the function notifies its operator coordinator StreamWriteOperatorCoordinator to mark a successful write.

Exactly-once Semantics

The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator starts a new instant on the timeline when a checkpoint triggers, the coordinator checkpoints always start before its operator, so when this function starts a checkpoint, a REQUESTED HoodieInstant already exists.

The function process thread then blocks data buffering and the checkpoint thread starts flushing the existing data buffer. When the existing data buffer writes successfully, the process thread unblock and starts buffering again for the next round checkpoint.

Because any checkpoint failures would trigger the write rollback, it implements the exactly-once semantics.

Fault Tolerance

The operator coordinator checks the validity for the last instant when it starts a new one. The operator rolls back the written data and throws when any error occurs. This means any checkpoint or task failure would trigger failover. The operator coordinator would try several times when committing the writestatus.

Note: The function task requires the input stream be partitioned by the partition fields to avoid different write tasks writing to the same file group that conflicts. The general case for partition path is a datetime field, so the sink task is very possible to have an IO bottleneck, the more flexible solution is to shuffle the data by the file group IDs(would be fixed by #step2).

# STEP2: Make the Write Task Scalable

To solve bottleneck 2.

For each partition, the write client of WriteProcessOperator handles all the logic for indexing/bucketing/data-write:

  1. indexing the records for INSERT/UPDATE
  2. use the PARTITIONER for bucketing in order to decide each record’s bucket(fileId)
  3. write the buckets one by one

The #step3 are processed by a single read, it is impossible for big dataset scaleling.

In order to solve bottleneck 2, we propose to split the WriteProcessOperator into 2 operators: the FileIdAssigner + BuckerWriter

The FileIdAssigner

FileIdAssigner does two things for a record:

  1. There is a BucketAssigner that build a checkpoint partition write profile per-record, it is the core role to assign a bucket ID (partition path + fileID)
  2. Lookup the index to see if the record is an UPDATE, if the record is UPDATE, patch up the existing fileID with it; if the record is INSERT, assign with a new (or assigned) fileID with it based on the configured bucket size.
  3. Send the record which is patched up with fileId s.


The FileIdAssigner output records would then be shuffled by the fileIds and sent to the BucketWriter.

The BucketWriter

BucketWriter takes a batch of HoodieRecord (with file IDs) and writes the assigned bucket one by one.

Generally, #step2 needs refactoring for the Flink client, especially the HoodieFlinkWriteClient, the current code base abstracts that the HoodieFlinkWriteClient handles all the tasks within #step2, which is suitable for Spark but not for Flink, we should abstract the indexing/bucketing work out of the client to make it more light-weight, it only writes the data based on the records bucket IDs.

# STEP3: Write as Mini-batch

To solve bottleneck 3.

  • We start a new instant when BucketWriterCoordinator starts(instead of start on new checkpoint from #step1 and #step2)
  • The BucketWriter blocks and flush the pending buffer data when a new checkpoint starts, it has an asynchronous thread to consume the buffer(for the first version, we flush the data out just in the #processElement) and write batch based on buffer data size;
  • For the BucketWriterCoordinator, if data within a checkpoint write success (got a checkpoint success notification), check and commit the inflight Instant and start a new instant

That means, if a checkpoint succeeds but we do not receive the success event, the infight instant will span two/(or more) checkpoints.

The Semantics

In order to improve the throughput, The function process thread does not block data buffering after the checkpoint thread starts flushing the existing data buffer. So there is possibility that the next checkpoint batch was written to current checkpoint. When a checkpoint failure triggers the write rollback, there may be some duplicate records (e.g. the eager write batch), the semantics is still correct using the UPSERT operation.

We may improve it to be exactly-once once the write supports write record one-by-one instead of batches (The write shift the file handles on checkpointing).

Fault Tolerance

We do not block when BucketWriter finish a checkpoint buffer flush, during the time interval until to the checkpoint completes, there probably be other mini-batch buffer (name it A) flush out, when a checkpoint timeout, we would re-consume the A buffer and flush it out, thus the buffer A data is written duplicately.


The operator coordinator checks and commits the last instant then starts a new one when a checkpoint finished successfully. The operator rolls back the written data and throws to trigger a failover when any error occurs. This means one Hoodie instant may span one or more checkpoints(some checkpoints notifications may be skipped). If a checkpoint timed out, the next checkpoint would help to rewrite the left buffer data (clean the buffer in the last step of the #flushBuffer method).


The operator coordinator would try several times when committing the write status.

To solve bottleneck 4.

The new Index is based on the BloomFilter index, a state is used as a cache of the underneath file based index:

  • check the state to find whether a record is an UPDATE, if it is true, do nothing
  • if the record is an INSERT, use the BloomFilter index to find the candidate files, look up these files and put all the index into the state

when all the files are checked, mark the index in pure state mode, we can then only check the state for the records that came in later.

The new index does not need to bootstrap for existing/history dataset, it is also useful for different Flink Job writes.

Implementation Plan

  1. Implements the current code base on #step1 and add a test framework to the hoodie-fink module, including the UTs and ITs
  2. Refactoring the HoodieFlinkWriteClient to abstract out the indexing/bucketing work, in hoodie-spark, they are all sub-pipelines, but in flink, we should abstract it as Interface/Operator
  3. Implements the code to #step2
  4. Implements the code to #step3
  5. Add a new index


Each step would have a separate JIRA issue.

  • No labels