Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. ResultSubPartition is kept when the consumer exits (task fails), the consumer only releases the view.
  2. ResultSubPartition is released through ResultPartitionManager on
    1. consumption finished; 
    2. producer failure/cancellation; 
    3. Job exits
  3. Multiple views can be created on the same subpartition, but only one view exists at a time (No two views exist at the same time).

The proposed changes are either:

  1. Change PipelinedSubPartition to fulfill the requirement, or
  2. Build a separate type of ResultSubPartition


There are four more points worth to highlight here:

Point1: Orphan/Zombie Tasks

...

Upstream credits are reset to the initial state as the downstream lost all input data after restarting.

3. Partial Records

A record can possibly span over multiple buffers. If a task reads a partial record and fails, the partial record is lost after restarting. The remaining part read from the upstream can not be deserialized successfully (can not decide the end of the record).

The proposed changes are either:

...

change to eliminate partial records is to either

  1. clean up partial results when releasing ResultSubpartitionView, or
  2. clean up partial results when recreating ResultSubpartitionView after the failed sink reconnects.

Ideally, a view is released upon the failed sink task releasing resources. However, in the case where a sink task fails to notify its upstream to release its view due to network failure as discussed in Section2, Point2, the view is released before recreation for the restarted sink task. Partial records are cleaned upon releasing the view in both cases.

Option 1 is preferred since it prunes fewer data. The reason is that Option1 cleans up partial data earlier than Option 2 in most cases. In Option2, all emitted buffers before the downstream side reconnects are discarded finally as these buffers adhere to partial records among each other. we can only retain the buffers with the clean boundary without partial records in front.

From the writer’s side, the writer always serializes a complete record (RecordWriter#emit => SpanningRecordSerializer#serializeRecord) and copied to BufferBuilder RecordWriter#copyFromSerializerToTargetChannel till the end of the record, so there won’t be a partial record during serialization. Partial records occur from the reader’s point of view, buffers/resources beyond ResultSubpartitionView should be cleaned.

...

Compatibility, Deprecation, and Migration Plan

...