Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


Current State

Status
titleUnder Discussion


Status
colourYellow
titleIn Progress

(tick)

Status
colourRed
titleABANDONED


Status
colourGreen
titleCompleted

(tick)

Status
colourBlue
titleINactive


Discussion thread: here

JIRA: here

Released: <Hudi Version>0.8.0

Table of Contents
Current Pipeline

...

The #step3 are processed by a single read, it is impossible for consuming big dataset scalelinghorizontally.

In order to solve bottleneck 2, we propose to split the WriteProcessOperator StreamWriteOperator into 2 operators: the FileIdAssigner + BuckerWriterBucketAssigner & BucketWriter

Image AddedImage Removed

The

...

BucketAssigner

FileIdAssigner does two things for a record:

  1. There is a BucketAssigner that build a checkpoint partition write profile per-record, it is the core role to assign a bucket ID (partition path + fileID)
  2. Lookup the index to see if the record is an UPDATE, if the record is UPDATE, patch up the existing fileID with it; if the record is INSERT, assign with a new (or assigned) fileID with it based on the configured bucket size.
  3. Send the record which is patched up with fileId s.
  4. The bucket assigning happens per-record and we can assign the buckets in parallelism only if there is no conflict: two task writes to one bucket (some simple hash algorithm solves the problem)


The BucketAssigner output records then shuffle The FileIdAssigner output records would then be shuffled by the fileIds and sent to the BucketWriter.

...

  • We start a new instant when BucketWriterCoordinator Coordinator starts(instead of start on new checkpoint from #step1 and #step2)
  • The BucketWriter blocks and flush the pending buffer data when a new checkpoint starts, it has an asynchronous thread to consume the buffer(for the first version, we flush the data out just in the #processElement) and write batch based on buffer data size;
  • For the BucketWriterCoordinatorIn Coordinator, if data within a one checkpoint write success (got a checkpoint success notification), check and commit the inflight Instant and start a new instant

Image RemovedImage Added

That means, if a checkpoint succeeds but we do not receive the success event, the infight inflight instant will span two/(or more) checkpoints.

...