Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This FLIP aims at resolving to solve several problems/shortcomings in the current streaming source interface (SourceFunction) with a possible end-goal of unifying and simultaneously to unify the source interfaces between the batch and streaming APIs. The shortcomings or points that we want to address are:

  • One currently implements different sources for batch and streaming execution.

  • The logic for "work discovery" (splits, partitions, etc) and actually "reading" the data is intermingled in the DataStream API, leading to complex implementations like the Kafka and Kinesis source.

  • Partitions/shards/splits are not explicit in the interface. This makes it hard to implement certain functionalities in a source-independent way, for example event-time alignment, per-partition watermarks, dynamic split assignment, work stealing. For example, the Kafka source supports per-partition watermarks, the Kinesis source doesn't. Neither source supports event-time alignment (selectively reading from splits to make sure that we advance evenly in event time).

  • The checkpoint lock is "owned" by the source function. The implementation has to ensure to make element emission and state update under the lock. There is no way for Flink to optimize how it deals with that lock.
    This also stands in the way of a lock-free actor/mailbox style threading model for operators.

  • There are no common building blocks, meaning every source implements a complex threading model by itself. That makes implementing and testing new sources hard, and adds a high bar to contributing to existing sources .

Overall Design

There are several key aspects in the design, which are discussed in each section.

Separating Work Discovery from Reading

...

Code Block
languagejava
titleDataStream API
linenumberstrue
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

FileSource<MyType> theSource = new ParquetFileSource("fs:///path/to/dir", AvroParquet.forSpecific(MyType.class));

DataStream<MyType> stream = env.continuousSource(theSource);

DataStream<MyType> boundedStream = env.boundedSource(theSource);

// this would be an option once we add bounded streams to the DataStream API
BoundedDataStream<MyType> batch = env.boundedSource(theSource);

...