Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Proposers

Approvers

Status

Current state


Current State

Status
titleUnder Discussion


Status
colourYellow
titleIn Progress


Status
colourRed
titleABANDONED


Status
colourGreen
titleCompleted

(tick)

Status
colourBlue
titleINactive


Discussion thread: here

JIRA: here

Released: 0.8.0

Table of Contents
Current Pipeline

...

The #step3 are processed by a single read, it is impossible for consuming big dataset scalelinghorizontally.

In order to solve bottleneck 2, we propose to split the WriteProcessOperator StreamWriteOperator into 2 operators: the FileIdAssigner + BuckerWriterBucketAssigner & BucketWriter

Image AddedImage Removed

The

...

BucketAssigner

FileIdAssigner does two things for a record:

  1. There is a BucketAssigner that build a checkpoint partition write profile per-record, it is the core role to assign a bucket ID (partition path + fileID)
  2. Lookup the index to see if the record is an UPDATE, if the record is UPDATE, patch up the existing fileID with it; if the record is INSERT, assign with a new (or assigned) fileID with it based on the configured bucket size.
  3. Send the record which is patched up with fileId s.
  4. The bucket assigning happens per-record and we can assign the buckets in parallelism only if there is no conflict: two task writes to one bucket (some simple hash algorithm solves the problem)


The BucketAssigner output records then shuffle The FileIdAssigner output records would then be shuffled by the fileIds and sent to the BucketWriter.

...

  • We start a new instant when BucketWriterCoordinator Coordinator starts(instead of start on new checkpoint from #step1 and #step2)
  • The BucketWriter blocks and flush the pending buffer data when a new checkpoint starts, it has an asynchronous thread to consume the buffer(for the first version, we flush the data out just in the #processElement) and write batch based on buffer data size;
  • For the BucketWriterCoordinatorIn Coordinator, if data within a one checkpoint write success (got a checkpoint success notification), check and commit the inflight Instant and start a new instant

Image RemovedImage Added

That means, if a checkpoint succeeds but we do not receive the success event, the infight inflight instant will span two/(or more) checkpoints.

Work Flow

The function firstly buffers the data as a batch of {@link HoodieRecord}s, It flushes(write) the records bucket when the bucket size exceeds the configured threshold {@link FlinkOptions#WRITE_BUCKET_SIZE}
or the whole data buffer size exceeds the configured threshold {@link FlinkOptions#WRITE_BUFFER_SIZE}
or a Flink checkpoint starts. After a batch has been written successfully,
the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write.

The Semantics

The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator
starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always
start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists.

In order to improve the throughput, The function process thread does not block data buffering
after the checkpoint thread starts flushing the existing data buffer. So there is possibility that the next checkpoint
batch was written to current checkpoint. When a checkpoint failure triggers the write rollback, there may be some duplicate records
(e.g. the eager write batch), the semantics is still correct using the UPSERT operation.

We may improve it to be exactly-once once the write supports write record one-by-one instead of batches (The write shift the file handles on checkpointing).

Fault Tolerance

...

The operator coordinator checks and commits the last instant then starts a new one when a checkpoint finished successfully.
The operator rolls back the written data and throws to trigger a failover when any error occurs.
This means one Hoodie instant may span one or more checkpoints(some checkpoints notifications may be skipped).
If a checkpoint timed out, the next checkpoint would help to rewrite the left buffer data (clean the buffer in the last
step of at the #flushBuffer method).

The operator coordinator would try several times when committing the write status.

Note: The function task requires the input stream be shuffled by the file IDs.

...

when all the files are checked, mark the index in pure state mode, we can then only check the state for the records that came in later.

Image RemovedImage Added

The new index does not need to bootstrap for existing/history dataset, it is also useful for different Flink Job writes.

...

That is to replace the coordinator with instance generator and commit sink just like the original pipeline.


Note that this pipeline can not work in mini-batch mode because there is no component to coordinate the mini-batches, and we can not control the sequence of the checkpoint success notifications of the operators. So it is impossible to write eagerly  and start instant immediately when a checkpoint finish.

Image AddedImage Removed

Implementation Plan

...