Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Given these requirements, there would be two options to implement this communication.


Option 1: Enumerator on the TaskManager

(TBD. explain more)

Advantages

  • Splits are just data messages. Checkpoints and watermarks just work as usual
  • Keeps all complexity out of the JobManager and the ExecutionGraph / Scheduler

Disadvantages

  • Need some changes to the network stack

Option 2: Enumerator on the JobManager

(TBD. explain more)

Advantages

  • No extra effort to keep the readers in separate failover regions
  • No changes to the network stack

Disadvantages

...

The SplitEnumerator runs as a task with parallelism one. Downstream of the enumerator are the SourceReader tasks, which run in parallel. Communication goes through the regular data streams.

The readers request splits by sending "backwards events", similar to "request partition" or the "superstep synchronization" in the batch iterations. These are not exposed in operators, but tasks have access to them. 
The task reacts to the backwards events: Only upon an event will it send a split. That gives us lazy/pull-based assignment. Payloads on the request backwards event messages (for example for locality awareness) is possible.

Checkpoints and splits are naturally aligned, because splits go through the data channels. The enumerator is effectively the only entry task from the source, and the only one that receives the "trigger checkpoint" RPC call.

Image Added

The network connection between enumerator and split reader is treated by the scheduler as a boundary of a failover region.

To decouple the enumerator and reader restart, we need one of the following mechanisms:

  1. Pipelined persistent channels: The contents of a channel is persistent between checkpoints. A receiving task requests the data "after checkpoint X". The data is pruned when checkpoint X+1 is completed.
    When a reader fails, the recovered reader task can reconnect to the stream after the checkpoint and will get the previously assigned splits. Batch is a special case, if there are no checkpoints, then the channel holds all data since the beginning.
    • Pro: The "pipelined persistent channel" has also applications beyond the enumerator to reader connection.
    • Con: Splits always go to the same reader and cannot be distributed across multiple readers upon recovery. Especially for batch programs, this may create bad stragglers during recovery.

  2. Reconnects and task notifications on failures:The enumerator task needs to remember the splits assigned to each result partition until the next checkpoint completes. The enumerator task would have to be notified of the failure of a downstream task and add the splits back to the enumerator. Recovered reader tasks would simply reconnect and get a new stream.
    • Pro: Re-distribution of splits across all readers upon failure/recovery (no stragglers).
    • Con: Breaks abstraction that separates task and network stack.


Option 2: Enumerator on the JobManager

(TBD. explain more)


Open Questions

In both cases, the enumerator is a point of failure that requires a restart of the entire dataflow.
To circumvent that, we probably need an additional mechanism, like a write-ahead log for split assignment.


Comparison between Options

...

CriterionEnumerate on TaskEnumerate on JobManager

Encapsulation of Enumerator

Encapsulation in separate TaskAdditional complexity in ExecutionGraph
Network Stack ChangesSignificant changes.
Some are more clear, like reconnecting. Some seem to break
abstractions, like
notifying tasks of downstream failures.
No Changes necessary
Scheduler / Failover RegionMinor changesNo changes necessary
Checkpoint alignmentNo changes necessary
(splits are data messages,
naturally align with barriers)
Careful coordination between split assignment
and checkpoint triggering.
Might be simple if both actions are run in the
single-threaded ExecutionGraph thread.
WatermarksNo changes necessary
(splits are data messages, watermarks
naturally flow)

Watermarks would go through ExecutionGraph
and RPC.

Checkpoint StateNo additional mechanism (only regular task state)Need to add support for mon-metadata state on
the JobManager / ExecutionGraph
Supporting graceful
Enumerator recovery
(avoid full restarts)

Network reconnects (like above), plus write-ahead of split
assignment between checkpoints.

Tracking split assignment between checkpoints, plus
write-ahead of split assignment between checkpoints.



Core Public Interfaces


Source

...