Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

...

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-10740

...

Release1.12


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

This FLIP aims to solve several problems/shortcomings in the current streaming source interface (SourceFunction) and simultaneously to unify the source interfaces between the batch and streaming APIs. The shortcomings or points that we want to address are:

...

Each source should be able to work as a bounded (batch) and as an unbounded (continuous streaming) source.

The actual decision whether it becomes bounded or unbounded is made in the DataStream API when creating the source stream.The Boundedness is a property that is passed to source when creating the SplitEnumerator. The readers should be agnostic to this distinction, they will simply read the splits assigned to them. Whether a source is bounded or unbounded is passed to the SplitEnumerator upon creationBoundedness is an intrinsic property to the source instance itself. In most cases, only the SplitEnumerators should know the Boundedness, while the SplitReaders are agnostic.

That way, we can also make the API type safe in the future, when we can explicitly model bounded streams.

Code Block
languagejava
titleDataStream API
linenumberstrue
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

FileSource<MyType> theSource = new ParquetFileSource("fs:///path/to/dir", AvroParquet.forSpecific(MyType.class));

DataStream<MyType> stream = env.continuousSource(theSource);

DataStream<MyType> boundedStream// The returned stream will be a DataStream if theSource is unbounded.
// After we add BoundedDataStream which extends DataStream, the returned stream will be a BoundedDataStream.
// This allows users to write programs working in both batch and stream execution mode.
DataStream<MyType> stream = env.boundedSourcesource(theSource);

// this would be an option onceOnce we add bounded streams to the DataStream API
BoundedDataStream<MyType> batch = env.boundedSource, we will also add the following API.
// The parameter has to be bounded otherwise an exception will be thrown.
BoundedDataStream<MyType> batch = env.boundedSource(theSource);

Examples

FileSource

...

  • Splits are both the type of work assignment and the type of state held by the source. Assigning a split or restoring a split from a checkpoint is the same to the reader.
  • Advancing the reader is a non-blocking call that returns a future.
  • We build higher-level primitives on top of the main interface (see below "High-level Readers")
  • We hide event-time / watermarks in the SourceOutput and pass different source contexts for batch (no watermarks) and streaming (with watermarks).
    The SourceOutput also abstract the per-partition watermark tracking.

The following reader interface is copied from the SourceReader in the public interface section.

Code Block
languagejava
titleSourceReader
linenumberstrue
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {

	void start();

	Status pollNext(SourceOutput<T> sourceOutput) throws Exception;

	List<SplitT> snapshotState();

	CompletableFuture<Void> isAvailable();

	void addSplits(List<SplitT> splits);

	default void handleSourceEvents(SourceEvent sourceEvent) {
		// Do nothing.
	}

	enum Status {
		/** The next record is available right now. */
		AVAILABLE_NOW,
		/** The next record will be available later. */
		AVAILABLE_LATER,
		/** The source reader has completed all the reading work. */
		FINISHED
	}
}

The implementation assumes that there is a single thread that drives the source. This single thread calls pollNext(...) when data is available and cedes execution when nothing is available. It also handles checkpoint triggering, timer callbacks, etc. thus making the task lock free.

The thread is expected to work on some form of mailbox, in which the data emitting loop is one possible task (next to timers, checkpoints, ...). Below is a very simple pseudo code for the driver loop.

...

The SourceReader will run as a PushingAsyncDataInput which works well with the new mailbox threading model in the tasks, similar to the network inputs.

Anchor
BaseImplementation
BaseImplementation
Base implementation and high-level readers

The core source interface (the lowest level interface) is very generic. That makes it flexible, but hard to implement for contributors, especially for sufficiently complex reader patterns like in Kafka or Kinesis.
In general, most I/O libraries used for connectors are not asynchronous, and would need to spawn an I/O thread to make them non-blocking for the main thread.

We propose to solve this by building higher level source abstractions that offer simpler interfaces that allow for blocking calls.
These higher level abstractions would also solve the issue of sources that handle multiple splits concurrently, and the per-split event time logic.

Most readers fall into one of the following categories:

  1. One reader single splits. (Some dead simple blocking readers)
  2. One reader multiple splits.
    1. Sequential Single Split (File, database query, most bounded splits)
    2. Multi-split multiplexed (Kafka, Pulsar, Pravega, ...)
    3. Multi-split multi-threaded (Kinesis, ...)


Image Added

Sequential Single Split

Image Added

Multi-split Multiplexed

Image Added

Multi-split Multi-threaded


Most of the readers implemented against these higher level building blocks would only need to implement an interface similar to this. The contract would also be that all methods except wakeup() would be called by the same thread, obviating the need for any concurrency handling in the connector.

Code Block
languagejava
titleSourceReader reading methods
linenumberstrue
public interface SplitReader<E, SplitT extends SourceSplit> {

	RecordsWithSplitIds<E> fetch() throws InterruptedException;

	void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);

	void wakeUp();
}

The RecordsWithSplitIds returned by the SplitReader will be passed to an RecordEmitter one by one. The RecordEmitter is responsible for the following:

  • Convert the raw record type <E> into the eventual record type <T>
  • Provide an event time timestamp for the record that it processes.

With the base implementation users writing their own source can just focus on:

  1. Fetch records from external system.
  2. Perform record parsing and conversion.
  3. Extract timestamps and optionally deal with watermarks. A followup FLIP will provide some default behaviors for users to deal with their watermark.

The base implementation can be roughly illustrated below:

Image Added

Some brief explanations:

  1. When a new split is added to the SourceReader by SplitEnumerator, the initial state of that new split is put into a state map maintained by the SourceReaderBase before the split is assigned to a SplitReader.
  2. The records are passed from the the SplitReaders to the RecordEmitter in RecordsBySplitIds. This allows the SplitReader to enqueue records in a batch manner, which benefits performance.
  3. The SourceReaderBase iterates over each records and looks up their corresponding split state. The Record and its corresponding split state is passed to the RecordEmitter.

Note that the abstraction of this base implementation does not specify where the deserialization is performed. Because the RecordEmitter is driven by the main mailbox thread of the task, ideally the deserialization should be done in the split reader so it is more scalable. It is also possible to introduce a deserialization thread pool to do that. However, the detail implementation of deserialization is not the focus of this FLIP and will be covered by followup FLIPs.

The interfaces used by the base implementation is covered in the section of interface for base implementation.

Failover

The state of the SplitEnumerator includes the following:

  • The unassigned splits
  • The splits that have been assigned but not successfully checkpointed yet.
    • The assigned but uncheckpointed splits will be associated with each of the checkpoint id they belong to.

The state of the SourceReader includes:

  • The assigned splits
  • The state of the splits (e.g. Kafka offsets, HDFS file offset, etc)

When the SplitEnumerator fails, a full failover will be performed. While it is possible to have a finer grained failover to only restore the state of the SplitEnumerator, we would like to address this in a separate FLIP.

When a SourceReader fails, the failed SourceReader will be restore to its last successful checkpoint. The SplitEnumerator will partially reset its state by adding the assigned-but-uncheckpointed splits back to the SplitEnumerator. In this case, only the failed subtask and its connected nodes will have to reset the states.

Where to run the enumerator

There was a long discussion about where to run the enumerator which we documented in the appendix. The final approach we took was very similar to option 3 with a few differences. The approach is following.

Each SplitEnumerator will be encapsulated in one SourceCoordinator. If there are multiple sources, multiple SourceCoordinator will there be. The SourceCoordinators will run in the JobMaster, but not as part of the ExecutionGraph. In this FLIP, we propose to failover the entire execution graph when the SplitEnumerator fails. A finer grained enumerator failover will be proposed in a later FLIP.

Per Split Event Time

With the introduction of the SourceSplit, we can actually emit per split event time for the users. We plan to propose the solution in a separate FLIP instead of in this FLIP to reduce the complexity.

Event Time Alignment

The event time alignment becomes easier to implement with the generic communication mechanism introduced between SplitEnumerator and SourceReader. In this FLIP we do not include this in the base implementation to reduce the complexity.

Public Interface

The public interface changes introduced by this FLIP consist of three parts.

  • The top level public interfaces.
  • The interfaces introduced as a part of the base implementation of the top level public interfaces.
    • The base implementation provides common functionalities required for most Source implementations. See base implementation for details.
  • The RPC gateway interface change for the generic message passing mechanism.

It is worth noting that while we will try best to maintain stable interfaces, the interfaces introduced as part of the base implementation (e.g. SplitReader) is more likely to change than the top level public interface such as SplitEnumerator / SourceReader. This is primarily because we expect to add more functionality into the base implementation over time.

Top level public interfaces

  • Source - A factory style class that helps create SplitEnumerator and SourceReader at runtime.
  • SourceSplit - An interface for all the split types.
  • SplitEnumerator - Discover the splits and assign them to the SourceReaders
  • SplitEnumeratorContext - Provide necessary information to the SplitEnumerator to assign splits and send custom events to the the SourceReaders.
  • SplitAssignment - A container class holding the source split assignment for each subtask.
  • SourceReader - Read the records from the splits assigned by the SplitEnumerator.
  • SourceReaderContext - Provide necessary function to the SourceReader to communicate with SplitEnumerator.
  • SourceOutput - A collector style interface to take the records and timestamps emit by the SourceReader.
  • WatermarkOutput - An interface for emitting watermark and indicate idleness of the source.
  • Watermark - A new Watermark class will be created in the package org.apache.flink.api.common.eventtime. This class will eventually replace the existing Watermark in org.apache.flink.streaming.api.watermark. This change allows flink-core to remain independent of other modules. Given that we will eventually put all the watermark generation into the Source, this change will be necessary. Note that this FLIP does not intended to change the existing way that watermark can be overridden in the DataStream after they are emitted by the source.

Source

Code Block
languagejava
titlereader loop pseudo codeSource
linenumberstrue
final BlockingQueue<Runnable> mailbox = new LinkedBlockingQueue<>();

final SourceReader<T> reader = ...;

final Runnable readerLoop = new Runnable() {

    public void run() {
        while (true) {
            ReaderStatus status = reader.emitNext(output);
   /**
 * The interface for Source. It acts like a factory class that helps construct
 * the {@link SplitEnumerator} and {@link SourceReader} and corresponding
 * serializers.
 *
 * @param <T>        The iftype (statusof == MORE_AVAILABLE) {
     records produced by the source.
 * @param <SplitT>   The type of splits handled by the source.
 if (mailbox.isEmpty()) {
                    continue;
                }
                else {
                    addReaderLoopToMailbox();
                    break;
                }
            }
            else if (status == NOTHING_AVAILABLE) {
                reader.available().thenAccept((ignored) -> addReaderLoopToMailbox());
                break;
            }
            else if (status == END_OF_SPLIT_DATA) {
                break;
            }
        }
    };

    void addReaderLoopToMailbox() {
        mailbox.add(readerLoop);
    }

    void triggerCheckpoint(CheckpointOptions options) {
        mailbox.add(new CheckpointAction(options));
    }

    void taskMainLoop() {
        while (taskAlive) {
            mailbox.take().run();
        }
    }
}

Base implementation and high-level readers

The core source interface (the lowest level interface) is very generic. That makes it flexible, but hard to implement for contributors, especially for sufficiently complex reader patterns like in Kafka or Kinesis.
In general, most I/O libraries used for connectors are not asynchronous, and would need to spawn an I/O thread to make them non-blocking for the main thread.

We propose to solve this by building higher level source abstractions that offer simpler interfaces that allow for blocking calls.
These higher level abstractions would also solve the issue of sources that handle multiple splits concurrently, and the per-split event time logic.

Most readers fall into one of the following categories:

  1. One reader single splits. (Some dead simple blocking readers)
  2. One reader multiple splits.
    1. Sequential Single Split (File, database query, most bounded splits)
    2. Multi-split multiplexed (Kafka, Pulsar, Pravega, ...)
    3. Multi-split multi-threaded (Kinesis, ...)

...

Image Removed

Sequential Single Split

...

Image Removed

Multi-split Multiplexed

...

Image Removed

Multi-split Multi-threaded

...

* @param <EnumChkT> The type of the enumerator checkpoints.
 */
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {

	/**
	 * Get the boundedness of this source.
	 * 
	 * @return the boundedness of this source.
	 */
	Boundedness getBoundedness();

	/**
	 * Creates a new reader to read data from the spits it gets assigned.
	 * The reader starts fresh and does not have any state to resume.
	 *
	 * @param readerContext The {@link SourceReaderContext context} for the source reader.
	 * @return A new SourceReader.
	 */
	SourceReader<T, SplitT> createReader(SourceReaderContext readerContext);

	/**
	 * Creates a new SplitEnumerator for this source, starting a new input.
	 *
	 * @param enumContext The {@link SplitEnumeratorContext context} for the split enumerator.
	 * @return A new SplitEnumerator.
	 */
	SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext);

	/**
	 * Restores an enumerator from a checkpoint.
	 *
	 * @param enumContext The {@link SplitEnumeratorContext context} for the restored split enumerator.
	 * @param checkpoint The checkpoint to restore the SplitEnumerator from.
	 * @return A SplitEnumerator restored from the given checkpoint.
	 */
	SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
			SplitEnumeratorContext<SplitT> enumContext,
			EnumChkT checkpoint) throws IOException;

	// ------------------------------------------------------------------------
	//  serializers for the metadata
	// ------------------------------------------------------------------------

	/**
	 * Creates a serializer for the source splits. Splits are serialized when sending them
	 * from enumerator to reader, and when checkpointing the reader's current state.
	 *
	 * @return The serializer for the split type.
	 */
	SimpleVersionedSerializer<SplitT> getSplitSerializer();

	/**
	 * Creates the serializer for the {@link SplitEnumerator} checkpoint.
	 * The serializer is used for the result of the {@link SplitEnumerator#snapshotState()}
	 * method.
	 *
	 * @return The serializer for the SplitEnumerator checkpoint.
	 */
	SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}


/**
 * The boundedness of the source: "bounded" for the currently available data (batch style),
 * "continuous unbounded" for a continuous streaming style source.
 */
public enum Boundedness {

	/**
	 * A bounded source processes the data that is currently available and will end after that.
	 *
	 * <p>When a source produces a bounded stream, the runtime may activate additional optimizations
	 * that are suitable only for bounded input. Incorrectly producing unbounded data when the source
	 * is set to produce a bounded stream will often result in programs that do not output any results
	 * and may eventually fail due to runtime errors (out of memory or storage).
	 */
	BOUNDED,

	/**
	 * A continuous unbounded source continuously processes all data as it comes.
	 *
	 * <p>The source may run forever (until the program is terminated) or might actually end at some point,
	 * based on some source-specific conditions. Because that is not transparent to the runtime,
	 * the runtime will use an execution mode for continuous unbounded streams whenever this mode
	 * is chosen.
	 */
	CONTINUOUS_UNBOUNDED
}

SourceSplit

Code Block
languagejava
titleSourceSplit
linenumberstrue
/**
 * An interface for all the Split types to implement.
 */
public interface SourceSplit {

	/**
	 * Get the split id of this source split.
	 * @return id of this source split.
	 */
	String splitId();
}

Anchor
SourceReader
SourceReader
SourceReader

Code Block
languagejava
titleSourceReader reading methods
linenumberstrue
public interface SplitReader<E, SplitT extends SourceSplit> {

	RecordsWithSplitIds<E> fetch() throws InterruptedException;

	void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);

	void wakeUp();
}

Where to run the enumerator

There was a long discussion about where to run the enumerator which we documented in the appendix. The final approach we took was very similar to option 3 with a few differences. The approach is following.

Each SplitEnumerator will be encapsulated in one SourceCoordinator. If there are multiple sources, multiple SourceCoordinator will there be. The SourceCoordinators will run in the JobMaster, but not as part of the ExecutionGraph. In this FLIP, we propose to failover the entire execution graph when the SplitEnumerator fails. A finer grained enumerator failover will be proposed in a later FLIP.

Per Split Event Time

With the introduction of the SourceSplit, we can actually emit per split event time for the users. We plan to propose the solution in a separate FLIP instead of in this FLIP to reduce the complexity.

Event Time Alignment

The event time alignment becomes easier to implement with the generic communication mechanism introduced between SplitEnumerator and SourceReader. In this FLIP we do not include this in the base implementation to reduce the complexity.

Public Interface

The public interface changes introduced by this FLIP consist of three parts.

  • The top level public interfaces.
  • The interfaces introduced as a part of the base implementation of the top level public interfaces.
    • The base implementation provides common functionalities required for most Source implementations. See base implementation for details.
  • The RPC gateway interface change for the generic message passing mechanism.

Top level public interfaces

  • Source - A factory style class that helps create SplitEnumerator and SourceReader at runtime.
  • SourceSplit - An interface for all the split types.
  • SplitEnumerator - Discover the splits and assign them to the SourceReaders
  • SplitEnumeratorContext - Provide necessary information to the SplitEnumerator to assign splits and send custom events to the the SourceReaders.
  • SplitAssignment - A container class holding the source split assignment for each subtask.
  • SourceReader - Read the records from the splits assigned by the SplitEnumerator.
  • SourceReaderContext - Provide necessary function to the SourceReader to communicate with SplitEnumerator.
  • SourceOutput - A collector style interface to take the records and timestamps emit by the SourceReader.

Source

/**
 * The interface for a source reader which is responsible for reading the records from
 * the source splits assigned by {@link SplitEnumerator}.
 *
 * @param <T> The type of the record emitted by this source reader.
 * @param <SplitT> The type of the the source splits.
 */
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {

	/**
	 * Start the reader;
	 */
	void start();

	/**
	 * Poll the next available record into the {@link SourceOutput}.
	 *
	 * <p>The implementation must make sure this method is non-blocking.
	 *
	 * <p>Although the implementation can emit multiple records into the given SourceOutput,
	 * it is recommended not doing so. Instead, emit one record into the SourceOutput
	 * and return a {@link Status#AVAILABLE_NOW} to let the caller thread
	 * know there are more records available.
	 *
	 * @return The {@link Status} of the SourceReader after the method invocation.
	 */
	Status pollNext(SourceOutput<T> sourceOutput) throws Exception;

	/**
	 * Checkpoint on the state of the source.
	 *
	 * @return the state of the source.
	 */
	List<SplitT> snapshotState();

	/**
	 * @return a future that will be completed once there is a record available to poll.
	 */
	CompletableFuture<Void> isAvailable();

	/**
	 * Adds a list of splits for this reader to read.
	 *
	 * @param splits The splits assigned by the split enumerator.
	 */
	void addSplits(List<SplitT> splits);

	/**
	 * Handle a source event sent by the {@link SplitEnumerator}
	 *
	 * @param sourceEvent the event sent by the {@link SplitEnumerator}.
	 */
	void handleSourceEvents(SourceEvent sourceEvent);

	/**
	 * The status of this reader.
	 */
	enum Status {
		/** The next record is available right now. */
		AVAILABLE_NOW,
		/** The next record will be available later. */
		AVAILABLE_LATER,
		/** The source reader has completed all the reading work. */
		FINISHED
	}
}

SourceOutput

Code Block
languagejava
titleSourceOutput
linenumberstrue
public interface SourceOutput<E> extends WatermarkOutput {

	void emitRecord(E record);

	void emitRecord(E record, long timestamp);
}

WatermarkOutput

Code Block
languagejava
titleWatermarkOutput
linenumberstrue
/**
 * An output for watermarks. The output accepts watermarks and idleness (inactivity) status.
 */
@Public
public interface WatermarkOutput {

	/**
	 * Emits the given watermark
Code Block
languagejava
titleSource
linenumberstrue
/**
 * The interface for Source. It acts like a factory class that helps construct
 * the {@link SplitEnumerator} and {@link SourceReader} and corresponding
 * serializers.
 *
 * @param <T>        The type of records produced by the source.
 * @param <SplitT>   The type of splits handled by the source.
 * @param <EnumChkT> The type of the enumerator checkpoints.
 */
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {

	/**
	 * Checks whether the source supports the given boundedness.
	 *
	 * <p>Some sources might only support either continuous unbounded streams, or
	 * bounded streams.
	 *
	 * <p>Emitting @parama boundednesswatermark Thealso boundednessimplicitly to check.
	 * @return <code>true</code> if the given boundedness is supported, <code>false</code> otherwisemarks the stream as <i>active</i>, ending
	 * previously marked idleness.
	 */
	booleanvoid supportsBoundednessemitWatermark(BoundednessWatermark boundednesswatermark);

	/**
	 * CreatesMarks athis newoutput readeras toidle, readmeaning data from the spits it gets assigned.
	 * The reader starts fresh and does not have any state to resume.
	 *that downstream operations do not
	 * @paramwait configfor A flat config forwatermarks from this source operatoroutput.
	 *
	 @param* readerContext<p>An Theoutput {@linkbecomes SourceReaderContextactive context}again foras thesoon source reader.
	 * @return A new SourceReaderas the next watermark is emitted.
	 */
	SourceReader<T, SplitT> createReader(
			Configuration config,
			SourceReaderContext readerContext);

	void markIdle();
}

Watermark

Code Block
languagejava
titleWatermark
linenumberstrue
/**
	 * Watermarks Createsare athe newprogress SplitEnumeratorindicators forin thisthe source,data startingstreams. aA newwatermark input.signifies
	 *
	 *that @paramno configevents Thewith configurationa fortimestamp this operator.
	 * @param enumContext The {@link SplitEnumeratorContext context} for the split enumerator.
	 * @return A new SplitEnumerator.
	 */
	SplitEnumerator<SplitT, EnumChkT> createEnumerator(
			Configuration config,
			SplitEnumeratorContext<SplitT> enumContext);

	/**
	 * Restores an enumerator from a checkpoint.
	 *
	 * @param config The configuration of this operator.
	 * @param enumContext The {@link SplitEnumeratorContext context} for the restored split enumerator.
	 * @param checkpoint The checkpoint to restore the SplitEnumerator from.
	 * @return A SplitEnumerator restored from the given checkpoint.
	 */
	SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
			Configuration config,
			SplitEnumeratorContext<SplitT> enumContext,
			EnumChkT checkpoint) throws IOException;

	// ------------------------------------------------------------------------
	//  serializers for the metadata
	// ------------------------------------------------------------------------

	/**
	 * Creates a serializer for the source splits. Splits are serialized when sending them
	 * from enumerator to reader, and when checkpointing the reader's current state.
	 *
	 * @return The serializer for the split type.
	 */
	SimpleVersionedSerializer<SplitT> getSplitSerializer();

	/**
	 * Creates the serializer for the {@link SplitEnumerator} checkpoint.
	 * The serializer is used for the result of the {@link SplitEnumerator#snapshotState()}
	 * method.
	 *
	 * @return The serializer for the SplitEnumerator checkpoint.
	 */
	SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}


/**
 * The boundedness of the source: "bounded" for the currently available data (batch style),
 * "continuous unbounded" for a continuous streaming style source.
 */
public enum Boundedness {smaller or equal to the watermark's time will occur after the
 * water. A watermark with timestamp <i>T</i> indicates that the stream's event time has progressed
 * to time <i>T</i>.
 *
 * <p>Watermarks are created at the sources and propagate through the streams and operators.
 *
 * <p>In some cases a watermark is only a heuristic, meaning some events with a lower timestamp
 * may still follow. In that case, it is up to the logic of the operators to decide what to do
 * with the "late events". Operators can for example ignore these late events, route them to a
 * different stream, or send update to their previously emitted results.
 *
 * <p>When a source reaches the end of the input, it emits a final watermark with timestamp
 * {@code Long.MAX_VALUE}, indicating the "end of time".
 *
 * <p>Note: A stream's time starts with a watermark of {@code Long.MIN_VALUE}. That means that all records
 * in the stream with a timestamp of {@code Long.MIN_VALUE} are immediately late.
 */
@Public
public final class Watermark implements Serializable {

	private static final long serialVersionUID = 1L;

	/** Thread local formatter for stringifying the timestamps. */
	private static final ThreadLocal<SimpleDateFormat> TS_FORMATTER = ThreadLocal.withInitial(
		() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"));

	// ------------------------------------------------------------------------

	/** The watermark that signifies end-of-event-time. */
	public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);

	// ------------------------------------------------------------------------

	/** The timestamp of the watermark in milliseconds. */
	private final long timestamp;

	/**
	 * Creates Aa boundednew sourcewatermark processeswith the datagiven thattimestamp is currently available and will end after thatin milliseconds.
	 */
	public * <p>When a source produces a bounded stream, the runtime may activate additional optimizations
	 * that are suitable only for bounded input. Incorrectly producing unbounded data when the source
	 * is set to produce a bounded stream will often result in programs that do not output any results
	 * and may eventually fail due to runtime errors (out of memory or storage).
	 */
	BOUNDED,

	/**
	 * A continuous unbounded source continuously processes all data as it comes.
	 *
	 * <p>The source may run forever (until the program is terminated) or might actually end at some point,
	 * based on some source-specific conditions. Because that is not transparent to the runtime,
	 * the runtime will use an execution mode for continuous unbounded streams whenever this mode
	 * is chosen.
	 */
	CONTINUOUS_UNBOUNDED
}

SourceSplit

Code Block
languagejava
titleSourceSplit
linenumberstrue
/**
 * An interface for all the Split types to implement.
 */
public interface SourceSplit {

	/**
	 * Get the split id of this source split.
	 * @return id of this source split.
	 */
	String splitId();
}

...

Watermark(long timestamp) {
		this.timestamp = timestamp;
	}

	/**
	 * Returns the timestamp associated with this Watermark.
	 */
	public long getTimestamp() {
		return timestamp;
	}

	/**
	 * Formats the timestamp of this watermark, assuming it is a millisecond timestamp.
	 * The returned format is "yyyy-MM-dd HH:mm:ss.SSS".
	 */
	public String getFormattedTimestamp() {
		return TS_FORMATTER.get().format(new Date(timestamp));
	}

	// ------------------------------------------------------------------------

	@Override
	public boolean equals(Object o) {
		return this == o ||
				o != null &&
				o.getClass() == Watermark.class &&
				((Watermark) o).timestamp == this.timestamp;
	}

	@Override
	public int hashCode() {
		return Long.hashCode(timestamp);
	}

	@Override
	public String toString() {
		return "Watermark @ " + timestamp + " (" + getFormattedTimestamp() + ')';
	}
}

SourceReaderContext

Code Block
languagejava
titleSourceReaderSourceReaderContext
linenumberstrue
/**
 * The interface for a source reader which is responsible for reading the records from
 * the source splits assigned by {@link SplitEnumerator}.
 *
 * @param <T> The type of the record emitted by this source reader.
 * @param <SplitT> The type of the the source splitsA context for the source reader. It allows the source reader to get the context information and
 * allows the SourceReader to send source event to its split enumerator.
 */
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {

	/* SourceReaderContext {

	/**
	 * Returns the metric group for this parallel subtask.
	 *
	 * Start the reader; @return metric group for this parallel subtask.
	 */
	voidMetricGroup startgetMetricGroup();

	/**
	 * PollSend thea nextsource availableevent record intoto the {@linkcorresponding SourceOutput}SplitEnumerator.
	 *
	 * <p>The@param implementationevent mustThe makesource sureevent this method is non-blockingto send.
	 */
	void sendEventToEnumerator(SourceEvent event);
}

SplitEnumerator

Code Block
languagejava
titleSplitEnumerator
linenumberstrue
/**
 * <p>AlthoughA theinterface implementationof cana emitsplit multipleenumerator recordsresponsible intofor the given SourceOutput,
	followings:
 * it1. isdiscover recommendedthe notsplits doingfor so. Instead, emit one record into the SourceOutput
	 * and return a {@link Status#AVAILABLE_NOW} to let the caller thread
	 * know there are more records available.
	 the {@link SourceReader} to read.
 * 2. assign the splits to the source reader.
 */
public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable {

	/**
	 * @returnStart Thethe {@link Status} of the SourceReader after the method invocationsplit enumerator.
	 *
	 * <p>The default behavior does nothing.
	 */
	Statusvoid pollNextstart(SourceOutput<T> sourceOutput) throws Exception;

	/**
	 * Handles the source event from the source reader.
	 *
	 * Checkpoint on the state of @param subtaskId the subtask id of the source reader who sent the source event.
	 *
	 *@param @returnsourceEvent the source stateevent offrom the source reader.
	 */
	List<SplitT>void snapshotState(handleSourceEvent(int subtaskId, SourceEvent sourceEvent);

	/**
	 * @returnAdd a future that will be completed once there is a record available to poll split back to the split enumerator. It will only happen when a {@link SourceReader} fails
	 * and there are splits assigned to it after the last successful checkpoint.
	 */
	CompletableFuture<Void> isAvailable();

	/**
	 * Adds a list of splits for this reader to read.
	 ** @param splits The split to add back to the enumerator for reassignment.
	 * @param splitssubtaskId The splits assigned by the split enumerator id of the subtask to which the returned splits belong.
	 */
	void addSplitsaddSplitsBack(List<SplitT> splits, int subtaskId);

	/**
	 * HandleAdd a new source eventreader sent bywith the {@link SplitEnumerator}
	 *
	 * @param sourceEvent the event sent by the {@link SplitEnumerator}.
	 */
	default void handleSourceEvents(SourceEvent sourceEvent) {
		// Do nothing.
	}

	/*given subtask ID.
	 *
	 * The status of this @param subtaskId the subtask ID of the new source reader.
	 */
	enumvoid Status {
	addReader(int subtaskId);

	/**
	 * TheCheckpoints nextthe recordstate isof availablethis rightsplit nowenumerator.
	 */
		AVAILABLE_NOW,
	CheckpointT snapshotState();

	/**
	 * The next record will be available later. */
		AVAILABLE_LATER,
		/** The source reader has completed all the reading work.Called to close the enumerator, in case it holds on to any resources, like threads or
	 * network connections.
	 */
		FINISHED@Override
	}void close() throws IOException;
}

...

SplitEnumeratorContext

Code Block
languagejava
titleSourceOutputSplitEnumeratorContext
linenumberstrue
public interface SourceOutput<E> extends WatermarkOutput {

	void emitRecord(E record);

	void emitRecord(E record, long timestamp);
}

SourceReaderContext

Code Block
languagejava
titleSourceReaderContext
linenumberstrue
/**
 * A context for the source reader. It allows the source reader to get the context information and
 * allows the SourceReader to send source event to its split enumerator.
 */
public interface SourceReaderContext {

	/**
	 * Returns the metric group for this parallel subtask.
	 *
	 * @return metric group for this parallel subtask.
	 */
	MetricGroup getMetricGroup/**
 * A context class for the {@link SplitEnumerator}. This class serves the following purposes:
 * 1. Host information necessary for the SplitEnumerator to make split assignment decisions.
 * 2. Accept and track the split assignment from the enumerator.
 * 3. Provide a managed threading model so the split enumerators do not need to create their
 *    own internal threads.
 *
 * @param <SplitT> the type of the splits.
 */
public interface SplitEnumeratorContext<SplitT extends SourceSplit> {

	MetricGroup metricGroup();

	/**
	 * Send a source event to thea correspondingsource SplitEnumeratorreader.
	 *
	The *source @paramreader eventis Theidentified sourceby eventits tosubtask sendid.
	 */
	void sendEventToEnumerator(SourceEvent event);
}

SplitEnumerator

Code Block
languagejava
titleSplitEnumerator
linenumberstrue
/**
 * A interface of a split enumerator responsible for the followings:
 * 1. discover the splits for the {@link SourceReader} to read.
 * 2. assign the splits to the source reader.
 */
public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable { * @param subtaskId the subtask id of the source reader to send this event to.
	 * @param event the source event to send.
	 * @return a completable future which will be completed when the event is successfully sent.
	 */
	void sendEventToSourceReader(int subtaskId, SourceEvent event);

	/**
	 * StartGet the splitnumber of enumeratorsubtasks.
	 *
	 * <p>The@return defaultthe behaviornumber doesof nothingsubtasks.
	 */
	defaultint void start() {
		// By default do nothing.
	}

	/**
	 * Handles the source event from the source reader.
	 numSubtasks();

	/**
	 * Get @paramthe subtaskIdcurrently theregistered subtaskreaders. idThe ofmapping theis sourcefrom readersubtask whoid sentto the sourcereader eventinfo.
	 *
	 @param* sourceEvent@return the sourcecurrently event from the source readerregistered readers.
	 */
	void handleSourceEvent(int subtaskIdMap<Integer, SourceEventReaderInfo> sourceEventregisteredReaders();

	/**
	 * Add a split back to the split enumerator. It will only happen when a {@link SourceReader} failsAssign the splits.
	 *
	 * @param andnewSplitAssignments therethe arenew splitssplit assignedassignments to it after the last successful checkpoint.
	 add.
	 */
	void assignSplits(SplitsAssignment<SplitT> newSplitAssignments);

	/**
	 * Invoke @paramthe splitscallable Theand splithandover tothe addreturn backvalue to the handler enumeratorwhich will forbe reassignment.executed
	 * @paramby subtaskIdthe The id of the subtask to which the returned splits belong.
	 */
	void addSplitsBack(List<SplitT> splits, int subtaskId);

	/**source coordinator.
	 *
	 * <p>It is important to make sure that the callable should not modify
	 * Addany ashared newstate. sourceOtherwise readerthe withthere themight givenbe subtaskunexpected IDbehavior.
	 *
	 * @param subtaskIdcallable thea subtaskcallable ID of the new source readerto call.
	 */
	void addReader(int subtaskId);

	/**
	 * Checkpoints the state of this split enumerator @param handler a handler that handles the return value of or the exception thrown from the callable.
	 */
	CheckpointT<T> void snapshotState(callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler);

	/**
	 * Called to closeInvoke the enumerator,callable inand casehandover itthe holdsreturn onvalue to the anyhandler resources,which likewill threadsbe orexecuted
	 * by the networksource connectionscoordinator.
	 */
	@Override
	void close()* throws IOException;
}

SplitEnumeratorContext

<p>It is important to make sure that the callable should not modify
	 * any shared state. Otherwise the there might be unexpected behavior.
	 *
	 * @param callable the callable to call.
	 * @param handler a handler that handles the return value of or the exception thrown from the callable.
	 * @param initialDelay the initial delay of calling the callable.
	 * @param period the period between two invocations of the callable.
	 */
	<T> void callAsync(Callable<T> callable,
					   BiConsumer<T, Throwable> handler,
					   long initialDelay,
					   long period);
}

SplitAssignment

Code Block
languagejava
titleSplitAssignment
linenumberstrue
/**
 * A class containing the splits assignment to the source readers.
 *
 * <p>The assignment is always incremental. In another word, splits in the assignment are simply
 * added to the existing assignment.
 */
public class SplitsAssignment<SplitT extends SourceSplit> {
	private final Map<Integer, List<SplitT>> assignment;

	public SplitsAssignment(Map<Integer, List<SplitT>> assignment) {
		this.assignment = assignment;
	}

Code Block
languagejava
titleSplitEnumeratorContext
linenumberstrue
/**
 * A context class for the {@link SplitEnumerator}. This class serves the following purposes:
 * 1. Host information necessary for the SplitEnumerator to make split assignment decisions.
 * 2. Accept and track the split assignment from the enumerator.
 * 3. Provide a managed threading model so the split enumerators do not need to create their
 *    own internal threads.
 *
 * @param <SplitT> the type of the splits.
 */
public interface SplitEnumeratorContext<SplitT extends SourceSplit> {

	MetricGroup metricGroup();

	/**
	 * Send a source event to a source reader. The source reader is identified by its subtask id.
	 *
	 * @param subtaskId the subtask id of the source reader to send this event to.
	 * @param event the source event to send.
	 * @return a completable future which will be completed when the event is successfully sent.
	 */
	void sendEventToSourceReader(int subtaskId, SourceEvent event);

	/**
	 * Get the number of subtasks @return A mapping from subtask ID to their split assignment.
	 */
	 * @return the number of subtasks.
	 */
	int numSubtaskspublic Map<Integer, List<SplitT>> assignment() {
		return assignment;
	}

	@Override
	public String toString() {
		return assignment.toString();

		}
}

SourceEvent

Code Block
languagejava
titleSourceEvent
linenumberstrue
/**
	 * GetAn theinterface currentlyfor registeredthe readers.events Thepassed mappingbetween isthe fromSourceReaders subtask id to the reader infoand Enumerators.
	 */
	 * @return the currently registered readers.
	 */
	Map<Integer, ReaderInfo> registeredReaders();

	/**
	 * Assign the splits.
	 *
	 * @param newSplitAssignments the new split assignments to add.
	 */
	void assignSplits(SplitsAssignment<SplitT> newSplitAssignments);

	/**
	 * Invoke the callable and handover the return value to the handler which will be executed
	 * by the source coordinator.
	 *
	 * <p>It is important to make sure that the callable should not modify
	 * any shared state. Otherwise the there might be unexpected behavior.
	 *
	 * @param callable a callable to call.
	 * @param handler a handler that handles the return value of or the exception thrown from the callable.
	 */
	<T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler);

	/**
	 * Invoke the callable and handover the return value to the handler which will be executed
	 * by the source coordinator.
	 *
	 * <p>It is important to make sure that the callable should not modify
	 * any shared state. Otherwise the there might be unexpected behavior.
	 *
	 * @param callable the callable to call.
	 * @param handler a handler that handles the return value of or the exception thrown from the callable.
	 * @param initialDelay the initial delay of calling the callable.
	 * @param period the period between two invocations of the callable.
	 */
	<T> void callAsync(Callable<T> callable,
					   BiConsumer<T, Throwable> handler,
					   long initialDelay,
					   long period);
}

SplitAssignment

Code Block
languagejava
titleSplitAssignment
linenumberstrue
/**
 * A class containing the splits assignment to the source readers.
 *
 * <p>The assignment is always incremental. In another word, splits in the assignment are simply
 * added to the existing assignment.
 */
public class SplitsAssignment<SplitT extends SourceSplit> {
	private final Map<Integer, List<SplitT>> assignment;

	public SplitsAssignment(Map<Integer, List<SplitT>> assignment) {
		this.assignment = assignment;
	}

	/**
	 * @return A mapping from subtask ID to their split assignment.
	 */
	public Map<Integer, List<SplitT>> assignment() {
		return assignment;
	}

	@Override
	public String toString() {
		return assignment.toString();
	}
}

SourceEvent

Code Block
languagejava
titleSourceEvent
linenumberstrue
/**
 * An event for the SourceReaders and Enumerators.
 */
public interface SourceEvent extends Serializable {}

StreamExecutionEnvironment

Code Block
languagejava
titleStreamExecutionEnvironment
linenumberstrue
public class StreamExecutionEnvironment {
...
    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}
...
}

Public interface from base Source implementation

The following interfaces are high level interfaces that are introduced by the base implementation of Source.

...

public interface SourceEvent extends Serializable {}

StreamExecutionEnvironment

Code Block
languagejava
titleStreamExecutionEnvironment
linenumberstrue
public class StreamExecutionEnvironment {
...
    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}
...
}

Anchor
BaseImplInterface
BaseImplInterface
Public interface from base Source implementation

The following interfaces are high level interfaces that are introduced by the base implementation of Source.

  • SourceReaderBase - The base implementation for SourceReader. It uses the following interfaces.
  • SplitReader - The stateless and thread-less high level reader which is only responsible for reading raw records of type <E> from the assigned splits.
  • SplitChange - The split change to the split reader. Right now there is only one subclass which is SplitAddition.
  • RecordsWithSplitIds - A container class holding the raw records of type <E> read by SplitReader. It allows the SplitReader to fetch and pass the records in batch.
  • RecordEmitter - A class that takes the raw records of type <E> returned by the SplitReader, convert them into the final record type <T> and emit them into the SourceOutput.

SourceReaderBase

Code Block
languagejava
titleSplitReader
linenumberstrue
/**
 * An abstract implementation of {@link SourceReader} which provides some sychronization between
 * the mail box main thread and the SourceReader internal threads. This class allows user to have
 * a SourceReader implementation by just providing the following:
 * <ul>
 *     <li>A {@link SplitReader}.</li>
 *     <li>A {@link RecordEmitter}</li>
 *     <li>The logic to clean up a split state after it is finished.</li>
 *     <li>The logic to get the state from a {@link SourceSplit}.</li>
 *     <li>The logic to restore a {@link SourceSplit} from its state.</li>
 * </ul>
 *
 * @param <E> The rich element type that contains information for split state update or timestamp extraction.
 * @param <T> The final element type to emit.
 * @param <SplitT> the immutable split type.
 * @param <SplitStateT> the mutable type of split state.
 */
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
		implements SourceReader<T, SplitT> {
	
	// -------------------- Abstract method to allow different implementations ------------------
	/**
	 * Handles the finished splits to clean the state if needed.
	 */
	protected abstract void onSplitFinished(Collection<String> finishedSplitIds);

	/**
	 * When new splits are added to the reader. The initialize the state of the new splits.
	 *
	 * @param split a newly added split.
	 */
	protected abstract SplitStateT initializedState(SplitT split);

	/**
	 * Convert a mutable SplitStateT to immutable SplitT.
	 *
	 * @param splitState splitState.
	 * @return an immutable Split state.
	 */
	protected abstract SplitT toSplitType(String splitId, SplitStateT splitState);
}

...

SplitReader

Code Block
languagejava
titleSplitReader
linenumberstrue
/**
 * An interface used to read from splits. The implementation could either read from a single split or from
 * multiple splits.
 *
 * @param <E> the element type.
 * @param <SplitT> the split type.
 */
public interface SplitReader<E, SplitT extends SourceSplit> {

	/**
	 * Fetch elements into the blocking queue for the given splits. The fetch call could be blocking
	 * but it should get unblocked when {@link #wakeUp()} is invoked. In that case, the implementation
	 * may either decide to return without throwing an exception, or it can just throw an interrupted
	 * exception. In either case, this method should be reentrant, meaning that the next fetch call
	 * should just resume from where the last fetch call was waken up or interrupted.
	 *
	 * @return A RecordsWithSplitIds that contains the fetched records Idsgrouped ofby the finishedsplit splitsids.
	 *
	 * @throws InterruptedException when interrupted
	 */
	RecordsWithSplitIds<E> fetch() throws InterruptedException;

	/**
	 * Handle the split changes. This call should be non-blocking.
	 *
	 * @param splitsChanges a queue with split changes that has not been handled by this SplitReader.
	 */
	void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);

	/**
	 * Wake up the split reader in case the fetcher thread is blocking in
	 * {@link #fetch()}.
	 */
	void wakeUp();
}

...

Code Block
languagejava
titleRecordWithSplitIds
linenumberstrue
/**
 * An interface for the elements passed from the fetchersSplitReader to the source reader.
 */
public interface RecordsWithSplitIds<E> {

	/**
	 * Get all the split ids.
	 *
	 * @return a collection of split ids.
	 */
	Collection<String> getSplitIds();

	/**
	 * Get all the records by Splits;
	 *
	 * @return a mapping from split ids to the records.
	 */
	Map<String, Collection<E>> getRecordsBySplits();

	/**
	 * Get the finished splits.
	 *
	 * @return the finished splits after this RecordsWithSplitIds is returned.
	 */
	Set<String> getFinishedSplits();
}

Anchor
RecordEmitter
RecordEmitter
RecordEmitter

Code Block
languagejava
titleRecordEmitter
linenumberstrue
/**
 * Emit a record to the downstream.
 *
 * @param <E> the type of the record emitted by the {@link SplitReader}
 * @param <T> the type of records that are eventually emitted to the {@link SourceOutput}.
 * @param <SplitStateT> the mutable type of split state.
 */
public interface RecordEmitter<E, T, SplitStateT> {

	/**
	 * Process and emit the records to the {@link SourceOutput}. A typical implementation will do the
	 * followings:
	 *
	 * <ul>
	 *     <li>
	 *         Convert the element emitted by the {@link SplitReader} to the target type taken by the
	 *         {@link SourceOutput}.
	 A* few recommendations to the implementation</li>
	 *   are  following:<li>
	 *
	 * <ul>
	 * 	<li>The method maybe interrupted in the middle. In that case, the same set of records will be passed
	 * 	to the record emitter again later. The implementation needs to make sure it reades
	 * 	<li>         Extract timestamp from the passed in element and emit the timestamp along with the record.
	 *     </li>
	 *     <li>
	 *         Emit watermarks for the source.
	 *     </li>
	 * </ul>
	 *
	 * @param element The intermediate element read by the SplitReader.
	 * @param output The output to which the final records are emit to.
	 * @param splitState The state of the split where the given element was from.
	 */
	void emitRecord(E element, SourceOutput<T> output, SplitStateT splitState) throws Exception;
}

...

Comparison between Options

CriterionEnumerate on TaskEnumerate on JobManagerEnumerate on SourceCoordinator

Encapsulation of Enumerator

Encapsulation in separate TaskAdditional complexity in ExecutionGraphNew component SourceCoordinator
Network Stack ChangesSignificant changes.
Some are more clear, like reconnecting. Some seem to break
abstractions, like
notifying tasks of downstream failures.
No Changes necessaryNo Changes necessary
Scheduler / Failover RegionMinor changesNo changes necessaryMinor changes
Checkpoint alignmentNo changes necessary
(splits are data messages,
naturally align with barriers)
Careful coordination between split assignment
and checkpoint triggering.
Might be simple if both actions are run in the
single-threaded ExecutionGraph thread.

No changes necessary (splits are through RPC, naturally align with barriers)

WatermarksNo changes necessary
(splits are data messages, watermarks
naturally flow)

Watermarks would go through ExecutionGraph
and RPC.

Watermarks would go through RPC
Checkpoint StateNo additional mechanism (only regular task state)Need to add support for asynchronous non-metadata
state on the JobManager / ExecutionGraph

Need to add support for asynchronous state on the SourceCoordinator

Supporting graceful
Enumerator recovery
(avoid full restarts)

Network reconnects (like above), plus write-ahead of split
assignment between checkpoints.

Tracking split assignment between checkpoints, plus
write-ahead of split assignment between checkpoints.

Tracking split assignment between checkpoints, plus
write-ahead of split assignment between checkpoints


Personal opinion from Stephan:  If we find an elegant way to abstract the network stack changes, I would lean towards running the Enumerator in a Task, not on the JobManager.

...