Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • One currently implements different sources for batch and streaming execution.

  • The logic for "work discovery" (splits, partitions, etc) and actually "reading" the data is intermingled in the SourceFunction interface and in the DataStream API, leading to complex implementations like the Kafka and Kinesis source.

  • Partitions/shards/splits are not explicit in the interface. This makes it hard to implement certain functionalities in a source-independent way, for example event-time alignment, per-partition watermarks, dynamic split assignment, work stealing. For example, the Kafka source supports and Kinesis consumers support per-partition watermarks, the Kinesis source doesn't. Neither source but as of Flink 1.8.1 only the Kinesis consumer supports event-time alignment (selectively reading from splits to make sure that we advance evenly in event time).

  • The checkpoint lock is "owned" by the source function. The implementation has to ensure to make element emission and state update under the lock. There is no way for Flink to optimize how it deals with that lock.
    The lock is not a fair lock. Under lock contention, some thready might not get the lock (the checkpoint thread).
    This also stands in the way of a lock-free actor/mailbox style threading model for operators.

  • There are no common building blocks, meaning every source implements a complex threading model by itself. That makes implementing and testing new sources hard, and adds a high bar to contributing to existing sources .

...

  1. Sequential Single Split (File, database query, most bounded splits)
  2. Multi-split multiplexed (Kafka, Pulsar, Pravega, ...)
  3. Multi-split multi-threaded (Kinesis, ...)


Image Modified

Sequential Single Split

Image Modified

Multi-split Multiplexed

Image Modified

Multi-split Multi-threaded


Most of the readers implemented against these higher level building blocks would only need to implement an interface similar to this. The contract would also be that all methods except wakeup() would be called by the same thread, obviating the need for any concurrency handling in the connector.

...

Comparison between Options

CriterionEnumerate on TaskEnumerate on JobManagerEnumerate on SourceCoordinator

Encapsulation of Enumerator

Encapsulation in separate TaskAdditional complexity in ExecutionGraphNew component SourceCoordinator
Network Stack ChangesSignificant changes.
Some are more clear, like reconnecting. Some seem to break
abstractions, like
notifying tasks of downstream failures.
No Changes necessaryNo Changes necessary
Scheduler / Failover RegionMinor changesNo changes necessaryMinor changes
Checkpoint alignmentNo changes necessary
(splits are data messages,
naturally align with barriers)
Careful coordination between split assignment
and checkpoint triggering.
Might be simple if both actions are run in the
single-threaded ExecutionGraph thread.

No changes necessary (splits are through RPC, naturally align with barriers)

WatermarksNo changes necessary
(splits are data messages, watermarks
naturally flow)

Watermarks would go through ExecutionGraph
and RPC.

Watermarks would go through RPC
Checkpoint StateNo additional mechanism (only regular task state)Need to add support for asynchronous non-metadata
state on the JobManager / ExecutionGraph

Need to add support for asynchronous state on the SourceCoordinator

Supporting graceful
Enumerator recovery
(avoid full restarts)

Network reconnects (like above), plus write-ahead of split
assignment between checkpoints.

Tracking split assignment between checkpoints, plus
write-ahead of split assignment between checkpoints.

Tracking split assignment between checkpoints, plus
write-ahead of split assignment between checkpoints


Personal opinion from Stephan:  If we find an elegant way to abstract the network stack changes, I would lean towards running the Enumerator in a Task, not on the JobManager.

...