You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This FLIP aims at resolving several problems/shortcomings in the current streaming source interface (SourceFunction) with a possible end-goal of unifying the source interfaces between the batch and streaming APIs. The shortcomings or points that we want to address are:

  • Partitions/shards/splits are not explicit in the interface. This makes it hard to implement certain functionalities in a source-independent way, for example event-time alignment, per-partition watermarks, dynamic split assignment, work stealing.
  • The current interface is push based, meaning the source is responsible for reading and pushing data downstream. This doesn't allow Flink to selectively read from certain partitions/shards/splits. This closely ties in with the above.
  • The checkpoint lock is "owned" by the source function. The implementation has to ensure to make element emission and state update under the lock. There is no way for Flink to optimize how it deals with that lock.

Currently, the Kafka source supports per-partition watermarks, the Kinesis source doesn't. Neither source supports event-time alignment (selectively reading from splits to make sure that we advance evenly in event time). A new interface where partitions/shards/splits are explicit would allow implementing these once for all sources in a generic way.

Public Interfaces

We propose a new Source interface along with two companion interfaces SplitEnumerator and SplitReader:

Source
public interface Source<T, SplitT, EnumeratorCheckpointT> extends Serializable {
   TypeSerializer<SplitT> getSplitSerializer();

   TypeSerializer<T> getElementSerializer();

   TypeSerializer<EnumeratorCheckpointT> getEnumeratorCheckpointSerializer();

   EnumeratorCheckpointT createInitialEnumeratorCheckpoint();

   SplitEnumerator<SplitT, EnumeratorCheckpointT> createSplitEnumerator(EnumeratorCheckpointT checkpoint);

   SplitReader<T, SplitT> createSplitReader(SplitT split);
}
SplitEnumerator
public interface SplitEnumerator<SplitT, CheckpointT> {
   Iterable<SplitT> discoverNewSplits();

   CheckpointT checkpoint();
}
SplitReader
public interface SplitReader<T, SplitT> {

   boolean start() throws IOException;

   boolean advance() throws IOException;

   T getCurrent() throws NoSuchElementException;

   long getCurrentTimestamp() throws NoSuchElementException;

   long getWatermark();

   SplitT checkpoint();

   boolean isDone() throws IOException;

   void close() throws IOException;
}

The Source interface itself is really only a factory for creating split enumerators and split readers. A split enumerator is responsible for detecting new partitions/shards/splits while a split reader is responsible for reading from one split. This separates the concerns and allows putting the enumeration in a parallelism-one operation or outside the execution graph. And also gives Flink more possibilities to decide how processing of splits should be scheduled.

A naive implementation prototype that implements this in user space atop the existing Flink operations is given here: https://github.com/aljoscha/flink/commits/refactor-source-interface. This also comes with a complete Kafka source implementation that already supports checkpointing.

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • The new interface and new source implementations will be provided side-by-side to the existing sources, thus not breaking existing programs.
  • We can think about allowing migrating existing jobs/savepoints smoothly to the new interface but it is a secondary concern.

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels