Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The current Flink writer pipeline is as follows:

Image RemovedImage Added

It has some bottlenecks to adapter to high-throughput data set writing:

...

Firstly, we can avoid the singleton task operator InstantGeneratorOperator by implementing an operator coordinator for the write operator, the coordinator always starts the checkpoint first, it starts a new commit on a new checkpointing

Image Modified

Work Flow

The write function would firstly buffer the data as a batch of HoodieRecords,

...

In order to solve bottleneck 2, we propose to split the WriteProcessOperator into 2 operators: the FileIdAssigner + BuckerWriter

Image RemovedImage Added

The FileIdAssigner

...

  • We start a new instant when BucketWriterCoordinator starts(instead of start on new checkpoint from #step1 and #step2)
  • The BucketWriter blocks and flush the pending buffer data when a new checkpoint starts, it has an asynchronous thread to consume the buffer(for the first version, we flush the data out just in the #processElement) and write batch based on buffer data size;
  • For the BucketWriterCoordinator, if data within a checkpoint write success (got a checkpoint success notification), check and commit the inflight Instant and start a new instant

Image Modified

That means, if a checkpoint succeeds but we do not receive the success event, the infight instant will span two/(or more) checkpoints.

...

when all the files are checked, mark the index in pure state mode, we can then only check the state for the records that came in later.

Image Modified

The new index does not need to bootstrap for existing/history dataset, it is also useful for different Flink Job writes.

...