You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current state: Under Discussion

Discussion threadhttps://lists.apache.org/thread.html/70484d6aa4b8e7121181ed8d5857a94bfb7d5a76334b9c8fcc59700c@%3Cdev.flink.apache.org%3E

JIRA Unable to render Jira issues macro, execution error.

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This FLIP aims at resolving several problems/shortcomings in the current streaming source interface (SourceFunction) with a possible end-goal of unifying the source interfaces between the batch and streaming APIs. The shortcomings or points that we want to address are:

  • One currently implements different sources for batch and streaming execution.

  • The logic for "work discovery" (splits, partitions, etc) and actually "reading" the data is intermingled in the DataStream API, leading to complex implementations like the Kafka and Kinesis source.

  • Partitions/shards/splits are not explicit in the interface. This makes it hard to implement certain functionalities in a source-independent way, for example event-time alignment, per-partition watermarks, dynamic split assignment, work stealing. For example, the Kafka source supports per-partition watermarks, the Kinesis source doesn't. Neither source supports event-time alignment (selectively reading from splits to make sure that we advance evenly in event time).

  • The checkpoint lock is "owned" by the source function. The implementation has to ensure to make element emission and state update under the lock. There is no way for Flink to optimize how it deals with that lock.
    This also stands in the way of a lock-free actor/mailbox style threading model for operators.

  • There are no common building blocks, meaning every source implements a complex threading model by itself. That makes implementing and testing new sources hard, and adds a high bar to contributing to existing sources .

Overall Design

Separating Work Discovery from Reading

The sources have two main components:

  • SplitEnumerator: Discovers and assigns splits (files, partitions, etc.)
  • Reader: Reads the actual data from the splits.

The SplitEnumerator is similar to the old batch source interface's functionality of creating splits and assigning splits. It runs only once, not in parallel (but could be thought of to parallelize in the future, if necessary).
It might run on the JobManager or in a single task on a TaskManager (see below "Where to runt the Enumerator").

The Reader reads the data from the assigned splits and encompasses most of the functionality of the current source interface. Some readers may read a sequence of bounded splits after another, some may ready multiple (unbounded) splits in parallel.

The main Source interface itself is only a factory for creating split enumerators and readers. This separation between enumerator and reader allows mixing and matching different enumeration strategies with split readers. For example, the current Kafka connector has different strategies for partition discovery that are intermingled with the rest of the code. With the new interfaces in place, we would only need one split reader implementation and there could be several split enumerators for the different partition discovery strategies.

Example:

  • In the File Source , the SplitEnumerator lists all files (possibly sub-dividing them into blocks/ranges). 
  • For the Kafka Source, the SplitEnumerator finds all Kafka Partitions that the source should read from.

Batch and Streaming Unification

Each source should be able to work as a bounded (batch) and as an unbounded (continuous streaming) source.

The actual decision whether it becomes bounded or unbounded is made in the DataStream API when creating the source stream.

The Boundedness is a property that is passed to source when creating the SplitEnumerator. The readers should be agnostic to this distinction, they will simply read the splits assigned to them. Whether a source is bounded or unbounded is passed to the SplitEnumerator upon creation.

That way, we can also make the API type safe in the future, when we can explicitly model bounded streams.

DataStream API
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

FileSource<MyType> theSource = new ParquetFileSource("fs:///path/to/dir", AvroParquet.forSpecific(MyType.class));

DataStream<MyType> stream = env.continuousSource(theSource);

DataStream<MyType> boundedStream = env.boundedSource(theSource);

// this would be an option once we add bounded streams to the DataStream API
BoundedDataStream<MyType> batch = env.boundedSource(theSource);

Examples

FileSource

  • For "bounded input", it uses a SplitEnumerator that enumerates once all files under the given path.
  • For "continuous input", it uses a SplitEnumerator that periodically enumerates once all files under the given path and assigns the new ones. 

KafkaSource

  • For "bounded input", it uses a SplitEnumerator that lists all partitions and gets the latest offset for each partition and attaches that as the "end offset" to the split.
  • For "continuous input", it uses a SplitEnumerator that lists all partitions and attaches LONG_MAX as the "end offset" to each split.
  • The source may have another option to periodically discover new partitions. That would only be applicable to the "continuous input".

Reader Interface and Threading Model

The reader need to fulfill the following properties:

  • No closed work loop, so it does not need to manage locking
  • Non-blocking progress methods, to it supports running in an actor/mailbox/dispatcher style operator
  • All methods called by the same on single thread, so implementors need not deal with concurrency
  • Watermark / Event time handling abstracted to be extensible for split-awareness and alignment (see below sections "Per Split Event-time" and "Event-time Alignment")
  • All readers should naturally supports state and checkpoints
  • Watermark generation should be circumvented for batch execution


The following core aspects give us these properties:

  • Splits are both the type of work assignment and the type of state held by the source. Assigning a split or restoring a split from a checkpoint is the same to the reader.
  • Advancing the reader is a non-blocking call that returns a future.
  • We build higher-level primitives on top of the main interface (see below "High-level Readers")
  • We hide event-time / watermarks in the SourceOutput and pass different source contexts for batch (no watermarks) and streaming (with watermarks).
    The SourceOutput also abstract the per-partition watermark tracking.


SourceReader reading methods
interface SourceReader {

    void start() throws IOException;

	CompletableFuture<?> available() throws IOException;

	ReaderStatus emitNext(SourceOutput<E> output) throws IOException;


	void addSplits(List<SplitT> splits) throws IOException;

	List<SplitT> snapshotState();
}


The implementation assumes that there is a single thread that drives the source. This single thread calls emitNext(...) when data is available and cedes execution when nothing is available. It also handles checkpoint triggering, timer callbacks, etc. thus making the task lock free.

The thread is expected to work on some form of mailbox, in which the data emitting loop is one possible task (next to timers, checkpoints, ...). Below is a very simple pseudo code for the driver loop.

This type of hot loop makes sure we do not use the future unless the reader is temporarily out of data, and we bypass the opportunistically bypass the mailbox for performance (mailbox will need some amount of synchronization).

reader loop pseudo code
final BlockingQueue<Runnable> mailbox = new LinkedBlockingQueue<>();


final SourceReader<T> reader = ...;


final Runnable readerLoop = new Runnable() {


    public void run() {
        while (true) {
            ReaderStatus status = reader.emitNext(output);
            if (status == MORE_AVAILABLE) {
                if (mailbox.isEmpty()) {
                    continue;
                }
                else {
                    addReaderLoopToMailbox();
                    break;
                }
            }
            else if (status == NOTHING_AVAILABLE) {
                reader.available().thenAccept((ignored) -> addReaderLoopToMailbox());
                break;
            }
            else if (status == END_OF_SPLIT_DATA) {
                break;
            }
        }
    };


    void addReaderLoopToMailbox() {
        mailbox.add(readerLoop);
    }


    void triggerCheckpoint(CheckpointOptions options) {
        mailbox.add(new CheckpointAction(options));
    }


    void taskMainLoop() {
        while (taskAlive) {
            mailbox.take().run();
        }
    }
}

High-level Readers

The core source interface (the lowest level interface) is very generic. That makes it flexible, but hard to implement for contributors, especially for sufficiently complex reader patterns like in Kafka or Kinesis.
In general, most I/O libraries used for connectors are not asynchronous, and would need to spawn an I/O thread to make them non-blocking for the main thread.

We propose to solve this by building higher level source abstractions that offer simpler interfaces that allow for blocking calls.
These higher level abstractions would also solve the issue of sources that handle multiple splits concurrently, and the per-split event time logic.

Most readers fall into one of the following categories:

  1. Sequential Single Split (File, database query, most bounded splits)
  2. Multi-split multiplexed (Kafka, Pulsar, Pravega, ...)
  3. Multi-split multi-threaded (Kinesis, ...)





Most of the readers implemented against these higher level building blocks would only need to implement an interface similar to this. The contract would also be that all methods except wakeup() would be called by the same thread, obviating the need for any concurrency handling in the connector.

SourceReader reading methods
interface SplitReader<RecordsT> extends Closeable {

	@Nullable
	RecordsT fetchNextRecords(Duration timeout) throws IOException;

	void wakeup();
}


Per Split Event Time

TBD.

Event Time Alignment

TBD.

Where to Run the Enumerator

TBD.

Core Public Interfaces


Source

SplitEnumerator
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {

	/**
	 * Checks whether the source supports the given boundedness.
	 *
	 * <p>Some sources might only support either continuous unbounded streams, or
	 * bounded streams.
	 */
	boolean supportsBoundedness(Boundedness boundedness);

	/**
	 * Creates a new reader to read data from the spits it gets assigned.
	 * The reader starts fresh and does not have any state to resume.
	 */
	SourceReader<T, SplitT> createReader(SourceContext ctx) throws IOException;

	/**
	 * Creates a new SplitEnumerator for this source, starting a new input.
	 */
	SplitEnumerator<SplitT, EnumChkT> createEnumerator(Boundedness mode) throws IOException;

	/**
	 * Restores an enumerator from a checkpoint.
	 */
	SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(Boundedness mode, EnumChkT checkpoint) throws IOException;

	// ------------------------------------------------------------------------
	//  serializers for the metadata
	// ------------------------------------------------------------------------

	/**
	 * Creates a serializer for the input splits. Splits are serialized when sending them
	 * from enumerator to reader, and when checkpointing the reader's current state.
	 */
	SimpleVersionedSerializer<SplitT> getSplitSerializer();

	/**
	 * Creates the serializer for the {@link SplitEnumerator} checkpoint.
	 * The serializer is used for the result of the {@link SplitEnumerator#snapshotState()}
	 * method.
	 */
	SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}


/**
 * The boundedness of the source: "bounded" for the currently available data (batch style),
 * "continuous unbounded" for a continuous streaming style source.
 */
public enum Boundedness {

	/**
	 * A bounded source processes the data that is currently available and will end after that.
	 *
	 * <p>When a source produces a bounded stream, the runtime may activate additional optimizations
	 * that are suitable only for bounded input. Incorrectly producing unbounded data when the source
	 * is set to produce a bounded stream will often result in programs that do not output any results
	 * and may eventually fail due to runtime errors (out of memory or storage).
	 */
	BOUNDED,

	/**
	 * A continuous unbounded source continuously processes all data as it comes.
	 *
	 * <p>The source may run forever (until the program is terminated) or might actually end at some point,
	 * based on some source-specific conditions. Because that is not transparent to the runtime,
	 * the runtime will use an execution mode for continuous unbounded streams whenever this mode
	 * is chosen.
	 */
	CONTINUOUS_UNBOUNDED
}


Reader

(see above)


Split Enumerator

SplitEnumerator
public interface SplitEnumerator<SplitT, CheckpointT> extends Closeable {

	/**
	 * Returns true when the input is bounded and no more splits are available.
	 * False means that the definite end of input has been reached, and is only possible
	 * in bounded sources.
	 */
	boolean isEndOfInput();

	/**
	 * Returns the next split, if it is available. If nothing is currently available, this returns
	 * an empty Optional.
	 * More may be available later, if the {@link #isEndOfInput()} is false.
	 */
	Optional<SplitT> nextSplit(ReaderLocation reader);

	/**
	 * Adds splits back to the enumerator. This happens when a reader failed and restarted,
	 * and the splits assigned to that reader since the last checkpoint need to be made
	 * available again.
	 */
	void addSplitsBack(List<SplitT> splits);

	/**
	 * Checkpoints the state of this split enumerator.
	 */
	CheckpointT snapshotState();

	/**
	 * Called to close the enumerator, in case it holds on to any resources, like threads or
	 * network connections.
	 */
	@Override
	void close() throws IOException;
}


public interface PeriodicSplitEnumerator<SplitT, CheckpointT> extends SplitEnumerator<SplitT, CheckpointT> {

	/**
	 * Called periodically to discover further splits.
	 *
	 * @return Returns true if further splits were discovered, false if not.
	 */
	boolean discoverMoreSplits() throws IOException;

	/**
	 * Continuous enumeration is only applicable to unbounded sources.
	 */
	default boolean isEndOfInput() {
		return false;
	}
}


StreamExecutionEnvironment

StreamExecutionEnvironment
public class StreamExecutionEnvironment {
...
    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}
...
}


SourceOutput and Watermarking

SourceOutput and Watermarking
public interface SourceOutput<E> extends WatermarkOutput {

	void emitRecord(E record);

	void emitRecord(E record, long timestamp);
}


/**
 * An output for watermarks. The output accepts watermarks and idleness (inactivity) status.
 */
public interface WatermarkOutput {

	/**
	 * Emits the given watermark.
	 *
	 * <p>Emitting a watermark also ends previously marked idleness.
	 */
	void emitWatermark(Watermark watermark);

	/**
	 * Marks this output as idle, meaning that downstream operations do not
	 * wait for watermarks from this output.
	 *
	 * <p>An output becomes active again as soon as the next watermark is emitted.
	 */
	void markIdle();
}

Implementation Plan

The implementation should proceed in the following steps, some of which can proceed concurrently.

  1. Validate the interface proposal by implementing popular connectors of different patterns:

    1. FileSource
      1. For a row-wise format (splittable within files, checkpoint offset within a split)
      2. For a bulk format like Parquet / Orc.
      3. Bounded and unbounded split enumerator
    2. KafkaSource
      1. Unbounded without dynamic partition discovery
      2. Unbounded with dynamic partition discovery
      3. Bounded
    3. Kinesis
      1. Unbounded
  2. Implement test harnesses for the high-level readers patterns
  3. Test their functionality of the readers implemented in (1)

  4. Implement a new SourceReaderTask and implement the single-threaded mailbox logic

  5. Implement SourceEnumeratorTask

  6. Implement the changes to network channels and scheduler, or to RPC service and checkpointing, to handle split assignment and checkpoints and re-adding splits.

Compatibility, Deprecation, and Migration Plan

In the DataStream API, we mark the existing source interface as deprecated but keep it for a few releases.
The new source interface is supported by different stream operators, so the two source interfaces can easily co-exist for a while.

We do not touch the DataSet API, which will be eventually subsumed by the DataStream API anyways.

Test Plan

TBD.




Previous Versions

Public Interfaces

We propose a new Source interface along with two companion interfaces SplitEnumerator and SplitReader:

Source
public interface Source<T, SplitT, EnumeratorCheckpointT> extends Serializable {
   TypeSerializer<SplitT> getSplitSerializer();

   TypeSerializer<T> getElementSerializer();

   TypeSerializer<EnumeratorCheckpointT> getEnumeratorCheckpointSerializer();

   EnumeratorCheckpointT createInitialEnumeratorCheckpoint();

   SplitEnumerator<SplitT, EnumeratorCheckpointT> createSplitEnumerator(EnumeratorCheckpointT checkpoint);

   SplitReader<T, SplitT> createSplitReader(SplitT split);
}
SplitEnumerator
public interface SplitEnumerator<SplitT, CheckpointT> {
   Iterable<SplitT> discoverNewSplits();

   CheckpointT checkpoint();
}
SplitReader
public interface SplitReader<T, SplitT> {

   /**
    * Initializes the reader and advances to the first record. Returns true
    * if a record was read. If no record was read, records might still be
    * available for reading in the future.
    */
   boolean start() throws IOException;

   /**
    * Advances to the next record. Returns true if a record was read. If no
    * record was read, records might still be available for reading in the future.
    *
    * <p>This method must return as fast as possible and not block if no records
    * are available.
    */
   boolean advance() throws IOException;

   /**
    * Returns the current record.
    */
   T getCurrent() throws NoSuchElementException;

   long getCurrentTimestamp() throws NoSuchElementException;

   long getWatermark();

   /**
    * Returns a checkpoint that represents the current reader state. The current
    * record is not the responsibility of the reader, it is assumed that the
    * component that uses the reader is responsible for that.
    */
   SplitT checkpoint();

   /**
    * Returns true if reading of this split is done, i.e. there will never be
    * any available records in the future.
    */
   boolean isDone() throws IOException;

   /**
    * Shuts down the reader.
    */
   void close() throws IOException;
}

The Source interface itself is really only a factory for creating split enumerators and split readers. A split enumerator is responsible for detecting new partitions/shards/splits while a split reader is responsible for reading from one split. This separates the concerns and allows putting the enumeration in a parallelism-one operation or outside the execution graph. And also gives Flink more possibilities to decide how processing of splits should be scheduled.

This also potentially allows mixing and matching different enumeration strategies with split readers. For example, the current Kafka connector has different strategies for partition discovery that are intermingled with the rest of the code. With the new interfaces in place, we would only need one split reader implementation and there could be several split enumerators for the different partition discovery strategies.

A naive implementation prototype that implements this in user space atop the existing Flink operations is given here: https://github.com/aljoscha/flink/commits/refactor-source-interface. This also comes with a complete Kafka source implementation that already supports checkpointing.

Proposed Changes

As an MVP, we propose to add the new interfaces and a runtime implementation using the existing SourceFunction for running the enumerator along with a special operator implementation for running the split reader. As a next step, we can add a dedicated StreamTask implementation for both the enumerator and reader to take advantage of the additional optimization potential. For example, more efficient handling of the checkpoint lock.

The next steps would be to implement event-time alignment.

Compatibility, Deprecation, and Migration Plan

  • The new interface and new source implementations will be provided side-by-side to the existing sources, thus not breaking existing programs.
  • We can think about allowing migrating existing jobs/savepoints smoothly to the new interface but it is a secondary concern.



  • No labels