Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

From the writer’s side, the writer always serializes a complete record (RecordWriter#emit => SpanningRecordSerializer#serializeRecord) and copied to BufferBuilder RecordWriter#copyFromSerializerToTargetChannel till the end of the record, so there won’t be a partial record during serialization. Partial records occur from the reader’s point of view, buffers/resources beyond ResultSubpartitionView should be cleaned.

4. Missing Events

The main issue is missing checkpoint barriers. It is possible that after recovery, a checkpoint barrier is lost. This will cause the checkpoint of the failed task (sink) waiting for the lost barrier indefinitely and not able to be complete. Upstream tasks of the sink are not affected by missing events since events are missing at the place of the failed task (Notice that the upstream subpartition has to reset isBlockedByCheckpoint if the failed task is blocked by checkpoint alignment before failure). 

It is also possible that downstream of the failed tasks miss barriers as well, but we will postpone the discussion till later.

The proposed solution is to

  1. attach the max checkpointID to the task deployment descriptor (in JM) so that sink can drop any barrier that is no-bigger than the id. A newly triggered checkpoint (max checkpointID + 1) will still be valid. The id can be fetched from CheckpointIDCounter in JM. Or,
  2. wait until a new checkpoint is triggered after restarting to drop the old not-aligned-yet barrier.


Option1 is preferable because we have the max number of allowed pending checkpoints in most cases. The system can be blocked forever before pending checkpoints are aborted.

Compatibility, Deprecation, and Migration Plan

...