Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Introduce SourceCoordinator to enumerate splits

...

The SplitEnumerator runs as a task with parallelism one. Downstream of the enumerator are the SourceReader tasks, which run in parallel. Communication goes through the regular data streams.

...

Critical parts here are the added complexity on the master (ExecutionGraph) and the checkpoints. Aligning them properly with RPC messages is possible when going through the now single threaded execution graph dispatcher thread, but to support asynchronous checkpoint writing requires more complexity.


Option 3: Introduce an independent component named SourceCoordinator, Enumerator runs on the SourceCoordinator

The SourceCoordinator is an independent component, not a part of ExecutionGraph. The SourceCoordinator could run on JobMaster or run as an independent process. There is no restrict by design. Communication with SourceCoordinator (Enumerator) is through RPC. Split assignment through RPC supports pull-based. SourceReader need to register to SourceCoordinator (address is in TaskDeploymentDescriptor or be updated by JobMaster through RPC) and then sends split request with payload information. 

Each job has at most one SourceCoordinator which is started by JobMaster. There might be several Enumerators in one job since there might be several different sources, all Enumerators run on this SourceCoordinator.

Split assignment need to satisfy the checkpointing mode semantics. Enumerator has its own states (split assignment), they are a part of global checkpoint. When a new checkpoint is triggered, CheckpointCoordinator sends barriers to SourceCoordinator first. SourceCoordinator snapshots states of all Enumerators. Then SourceCoordinator sends barriers to SourceReader through RPC. The split and barrier through RPC is FIFO, so Flink could align the split assignment with checkpoint naturally. 

If user specifies RestartAllStrategy as the failover strategy, Flink restarts all tasks and SourceCoordinator when a task fails. All tasks and Enumerators are restarted and restored from last successful checkpoint. 

If user specifies RestartPipelinedRegionStrategy as failover strategy, its a little complicated. There is no failover region problem in this model, since there is no execution edge between Enumerator and SourceReader (SourceCoordinator is not a port of ExecutionGraph). We need to explain it separately.

  • When a SourceReader task fails,JobMaster does not restart the SourceCoordinator or the Enumerators on it. JobMaster cancels other tasks in the same failover region with failed task as usual. Then JobMaster notifies Enumerator the failure or cancelation of SourceReader tasks (there might be multiple SourceReader tasks in same failover region) and which checkpoint version will be restored from. The notification happens before restarting new tasks. When Enumerator is aware of the task failures, it restores the states related failed tasks from the specific checkpoint version. That means SourceCoordinator need to support partial restoring. Enumerator also keeps a two-level map of SourceReader, checkpoint version and split assignment in memory. This map helps to find the splits should be reassigned or added back to Enumerator. There would be different strategies to handle these failed splits. In some event-time based jobs, reassignment of failed splits to other tasks may break the watermark semantics. After restoring the split assignment state, reconstructing the map in memory and handling the failed splits,Enumerator returns an acknowledgement back to JobMaster, then JobMaster restarts the tasks of failed region. There might be an optimization that Enumerator returns an acknowledgement immediately without waiting for restoring. Thus the scheduling of failed region tasks and restoring Enumerator can be processing at the same time. Another important thing is that when Enumerator is restoring, the other running SourceReaders should work normally, including pulling next split. 

  • When Enumerator or SourceCoordinator fails, if there is a write-ahead log available (mentioned below),JobMaster would restart the Enumerator or SourceCoordinator but not restart SourceReader tasks. After restarting,Enumerator restores states, replays the write-ahead log, then starts to working. At the meantime,SourceReader waits for reconnecting, there is no more splits assigned temporarily until reregistering successfully. The reregistration is necessary. There should be alignment after replaying write-ahead log between Enumerator and SourceReader because Enumerator can not make sure last split assignments to each SourceReader are successful or not. The reconnection information is updated by JobMaster if needed (process is crashed). If there is no write-ahead log available, the failover would fallback to global failover, all tasks and Enumerators would be restarted and restored from last successful checkpoint.

CheckpointCoordinator should notify Enumerator that checkpoint has been completed. So Enumerator could prune the map kept in memory and the write-ahead log.


Open Questions

In both cases, the enumerator is a point of failure that requires a restart of the entire dataflow.
To circumvent that, we probably need an additional mechanism, like a write-ahead log for split assignment.

...

Comparison between Options

CriterionEnumerate on TaskEnumerate on JobManagerEnumerate on SourceCoordinator

Encapsulation of Enumerator

Encapsulation in separate TaskAdditional complexity in ExecutionGraphNew component SourceCoordinator
Network Stack ChangesSignificant changes.
Some are more clear, like reconnecting. Some seem to break
abstractions, like
notifying tasks of downstream failures.
No Changes necessaryNo Changes necessary
Scheduler / Failover RegionMinor changesNo changes necessaryMinor changes
Checkpoint alignmentNo changes necessary
(splits are data messages,
naturally align with barriers)
Careful coordination between split assignment
and checkpoint triggering.
Might be simple if both actions are run in the
single-threaded ExecutionGraph thread.

No changes necessary (splits are through RPC, naturally align with barriers)

WatermarksNo changes necessary
(splits are data messages, watermarks
naturally flow)

Watermarks would go through ExecutionGraph
and RPC.

Watermarks would go through RPC
Checkpoint StateNo additional mechanism (only regular task state)Need to add support for asynchronous non-metadata
state on the JobManager / ExecutionGraph

Need to add support for asynchronous state on the SourceCoordinator

Supporting graceful
Enumerator recovery
(avoid full restarts)

Network reconnects (like above), plus write-ahead of split
assignment between checkpoints.

Tracking split assignment between checkpoints, plus
write-ahead of split assignment between checkpoints.

Tracking split assignment between checkpoints, plus
write-ahead of split assignment between checkpoints


Personal opinion from Stephan:  If we find an elegant was way to abstract the network stack changes, I would lean towards running the Enumerator in a Task, not on the JobManager.

...