Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There are several key aspects in the design, which are discussed in each section. The discussion helps understand the public interface better.

Separating Work Discovery from Reading

...

  • For "bounded input", it uses a SplitEnumerator that lists all partitions and gets the latest offset for each partition and attaches that as the "end offset" to the split.
  • For "continuous input", it uses a SplitEnumerator that lists all partitions and attaches LONG_MAX as the "end offset" to each split.
  • The source may have another option to periodically discover new partitions. That would only be applicable to the "continuous input".

Reader Interface and Threading Model

The reader need to fulfill the following properties:

  • No closed work loop, so it does not need to manage locking
  • Non-blocking progress methods, to it supports running in an actor/mailbox/dispatcher style operator
  • All methods called by the same on single thread, so implementors need not deal with concurrency
  • Watermark / Event time handling abstracted to be extensible for split-awareness and alignment (see below sections "Per Split Event-time" and "Event-time Alignment")
  • All readers should naturally supports state and checkpoints
  • Watermark generation should be circumvented for batch execution

The following core aspects give us these properties:

  • Splits are both the type of work assignment and the type of state held by the source. Assigning a split or restoring a split from a checkpoint is the same to the reader.
  • Advancing the reader is a non-blocking call that returns a future.
  • We build higher-level primitives on top of the main interface (see below "High-level Readers")
  • We hide event-time / watermarks in the SourceOutput and pass different source contexts for batch (no watermarks) and streaming (with watermarks).
    The SourceOutput also abstract the per-partition watermark tracking.

Anchor
GenericComm
GenericComm
Generic enumerator-reader communication mechanism

The SplitEnumerator and SourceReader are both user implemented class. It is not rare that the implementation require some communication between these two components. In order to facilitate such use cases. In this FLIP, we introduce a generic message passing mechanism between the SplitEnumerator and SourceReader. This mechanism require an additional RPC method pair in the JobMasterGateway and TaskExecutorGateway. The message passing stack could be illustrated below.

Image Added

The SourceEvent is the interface for messages passed between the SplitEnumerator and the SourceReader. The OperatorEvent is the interface for messages passed between the OperatorCoordinator and Operator. The OperatorCoordinator is a generic coordinator that could be associated with any operator. In this FLIP, the SourceCoordinator will be an implementation of OperatorCoordinator that encapsulate SplitEnumerator.

Reader Interface and Threading Model

The reader need to fulfill the following properties:

  • No closed work loop, so it does not need to manage locking
  • Non-blocking progress methods, to it supports running in an actor/mailbox/dispatcher style operator
  • All methods called by the same on single thread, so implementors need not deal with concurrency
  • Watermark / Event time handling abstracted to be extensible for split-awareness and alignment (see below sections "Per Split Event-time" and "Event-time Alignment")
  • All readers should naturally supports state and checkpoints
  • Watermark generation should be circumvented for batch execution


The following core aspects give us these properties:

  • Splits are both the type of work assignment and the type of state held by the source. Assigning a split or restoring a split from a checkpoint is the same to the reader.
  • Advancing the reader is a non-blocking call that returns a future.
  • We build higher-level primitives on top of the main interface (see below "High-level Readers")
  • We hide event-time / watermarks in the SourceOutput and pass different source contexts for batch (no watermarks) and streaming (with watermarks).
    The SourceOutput also abstract the per-partition watermark tracking.

The following reader interface is copied from the SourceReader in the public interface section.

Code Block
languagejava
titleSourceReader
linenumberstrue
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {

	void start();

	Status pollNext(SourceOutput<T> sourceOutput) throws Exception;

	List<SplitT> snapshotState();

	CompletableFuture<Void> isAvailable();

	
Code Block
languagejava
titleSourceReader
linenumberstrue
interface SourceReader {

    void start() throws IOException;

    CompletableFuture<?> isAvailable() throws IOException;

    Status pollNext(SourceOutput<E> output) throws IOException;

    void addSplits(List<SplitT> splits) throws IOException;

	default void handleSourceEvents(SourceEvent sourceEvent)  List<SplitT> snapshotState();
}{
		// Do nothing.
	}

	enum Status {
		/** The next record is available right now. */
		AVAILABLE_NOW,
		/** The next record will be available later. */
		AVAILABLE_LATER,
		/** The source reader has completed all the reading work. */
		FINISHED
	}
}


The implementation assumes that there is a single thread that drives the source. This single thread calls pollNext(...The implementation assumes that there is a single thread that drives the source. This single thread calls pollNext(...) when data is available and cedes execution when nothing is available. It also handles checkpoint triggering, timer callbacks, etc. thus making the task lock free.

...

Code Block
languagejava
titlereader loop pseudo code
linenumberstrue
final BlockingQueue<Runnable> mailbox = new LinkedBlockingQueue<>();

final SourceReader<T> reader = ...;

final Runnable readerLoop = new Runnable() {

    public void run() {
        while (true) {
            ReaderStatus status = reader.emitNext(output);
            if (status == MORE_AVAILABLE) {
                if (mailbox.isEmpty()) {
                    continue;
                }
                else {
                    addReaderLoopToMailbox();
                    break;
                }
            }
            else if (status == NOTHING_AVAILABLE) {
                reader.available().thenAccept((ignored) -> addReaderLoopToMailbox());
                break;
            }
            else if (status == END_OF_SPLIT_DATA) {
                break;
            }
        }
    };

    void addReaderLoopToMailbox() {
        mailbox.add(readerLoop);
    }

    void triggerCheckpoint(CheckpointOptions options) {
        mailbox.add(new CheckpointAction(options));
    }

    void taskMainLoop() {
        while (taskAlive) {
            mailbox.take().run();
        }
    }
}

...

Base implementation and high-level

...

readers

The core source interface (the lowest level interface) is very generic. That makes it flexible, but hard to implement for contributors, especially for sufficiently complex reader patterns like in Kafka or Kinesis.
In general, most I/O libraries used for connectors are not asynchronous, and would need to spawn an I/O thread to make them non-blocking for the main thread.

...

Most readers fall into one of the following categories:

  1. One reader single splits. (Some dead simple blocking readers)
  2. One reader multiple splits.
    1. Sequential Single Split
    Sequential Single Split
    1. (File, database query, most bounded splits)
    2. Multi-split multiplexed (Kafka, Pulsar, Pravega, ...)
    3. Multi-split multi-threaded (Kinesis, ...)


Sequential Single Split

Multi-split Multiplexed

Multi-split Multi-threaded

...

Code Block
languagejava
titleSourceReader reading methods
linenumberstrue
public interface SplitReader<E, SplitReader<RecordsT>SplitT extends CloseableSourceSplit> {

	@NullableRecordsWithSplitIds<E> fetch() throws InterruptedException;

	RecordsTvoid fetchNextRecordshandleSplitsChanges(DurationQueue<SplitsChange<SplitT>> timeoutsplitsChanges) throws IOException;

	void wakeupwakeUp();
}

Per Split Event Time

TBDWith the introduction of the SourceSplit, we can actually emit per split event time for the users. We plan to propose the solution in a separate FLIP instead of in this FLIP to reduce the complexity.

Event Time Alignment

TBD.The event time alignment becomes easier to implement with the generic communication mechanism introduced between SplitEnumerator and SourceReader. In this FLIP we do not include this in the base implementation to reduce the complexity.


Anchor
where_run_enumerator
where_run_enumerator

...

Personal opinion from Stephan:  If we find an elegant way to abstract the network stack changes, I would lean towards running the Enumerator in a Task, not on the JobManager.

Public

...

Interface

The public interface changes introduced by this FLIP consist of two three parts.

  • The top level public interfaces.
  • The interfaces introduced as a part of the base implementation of the top level public interfaces.
    • The base implementation provides common functionalities required for most Source implementations. See
    base implementation for details.

Top level public interfaces

  • Source - A factory style class that helps create SplitEnumerator and SourceReader at runtime.
  • SourceSplit - An interface for all the split types.
  • SplitEnumerator - Discover the splits and assign them to the SourceReaders
  • SplitEnumeratorContext - Provide necessary information to the SplitEnumerator to assign splits and send custom events to the the SourceReaders.
  • SplitAssignment - A container class holding the source split assignment for each subtask.
  • SourceReader - Read the records from the splits assigned by the SplitEnumerator.
  • SourceReaderContext - Provide necessary function to the SourceReader to communicate with SplitEnumerator.
  • SourceOutput - A collector style interface to take the records and timestamps emit by the SourceReader.

Top level public interfaces

  • Source - A factory style class that helps create SplitEnumerator and SourceReader at runtime.
  • SourceSplit - An interface for all the split types.
  • SplitEnumerator - Discover the splits and assign them to the SourceReaders
  • SplitEnumeratorContext - Provide necessary information to the SplitEnumerator to assign splits and send custom events to the the SourceReaders.
  • SplitAssignment - A container class holding the source split assignment for each subtask.
  • SourceReader - Read the records from the splits assigned by the SplitEnumerator.
  • SourceReaderContext - Provide necessary function to the SourceReader to communicate with SplitEnumerator.
  • SourceOutput - A collector style interface to take the records and timestamps emit by the SourceReader.

Source

Code Block
languagejava
titleSource
linenumberstrue
/**
 * The interface for Source. It acts like a factory class that helps construct
 * the {@link SplitEnumerator} and {@link SourceReader} and corresponding
 * serializers.
 *
 * @param <T>        The type of records produced by the source.
 * @param <SplitT>   The type of splits handled by the source.
 * @param <EnumChkT> The type of the enumerator checkpoints.
 */
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {

	/**
	 * Checks whether the source supports the given boundedness.
	 *
	 * <p>Some sources might only support either continuous unbounded streams, or
	 * bounded streams.
	 *
	 * @param boundedness The boundedness to check.
	 * @return <code>true</code> if the given boundedness is supported, <code>false</code> otherwise.
	 */
	boolean supportsBoundedness(Boundedness boundedness);

	/**
	 * Creates a new reader to read data from the spits it gets assigned.
	 * The reader starts fresh and does not have any state to resume.
	 *
	 * @param config A flat config for this source operator.
	 * @param readerContext The {@link SourceReaderContext context} for the source reader.
	 * @return A new SourceReader.
	 */
	SourceReader<T, SplitT> createReader(
			Configuration config,
			SourceReaderContext readerContext);

	/**
	 * Creates a new SplitEnumerator for this source, starting a new input.
	 *
	 * @param config The configuration for this operator.
	 * @param enumContext The {@link SplitEnumeratorContext context} for the split enumerator.
	 * @return A new SplitEnumerator.
	 */
	SplitEnumerator<SplitT, EnumChkT> createEnumerator(
			Configuration config,
			SplitEnumeratorContext<SplitT> enumContext);

	/**
	 * Restores an enumerator from a checkpoint.
	 *
	 * @param config The configuration of this operator.
	 * @param enumContext The {@link SplitEnumeratorContext context} for the restored split enumerator.
	 * @param checkpoint The checkpoint to restore the SplitEnumerator from.
	 * @return A SplitEnumerator restored from the given checkpoint.
	 */
	SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
			Configuration config,
			SplitEnumeratorContext<SplitT> enumContext,
			EnumChkT checkpoint) throws IOException;

	// ------------------------------------------------------------------------
	//  serializers for the metadata
	// ------------------------------------------------------------------------

	/**
	 * Creates a serializer for the source splits. Splits are serialized when sending them
	 * from enumerator to reader, and when checkpointing the reader's current state.
	 *
	 * @return The serializer for the split type.
	 */
	SimpleVersionedSerializer<SplitT> getSplitSerializer();

	/**
	 * Creates the serializer for the {@link SplitEnumerator} checkpoint.
	 * The serializer is used for the result of the {@link SplitEnumerator#snapshotState()}
	 * method.
	 *
	 * @return The serializer for the SplitEnumerator checkpoint.
	 */
	SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}


/**
 * The boundedness of the source: "bounded" for the currently available data (batch style),
 * "continuous unbounded" for a continuous streaming style source.
 */
public enum Boundedness {

	/**
	 * A bounded source processes the data that is currently available and will end after that.
	 *
	 * <p>When a source produces a bounded stream, the runtime may activate additional optimizations
	 * that are suitable only for bounded input. Incorrectly producing unbounded data when the source
	 * is set to produce a bounded stream will often result in programs that do not output any results
	 * and may eventually fail due to runtime errors (out of memory or storage).
	 */
	BOUNDED,

	/**
	 * A continuous unbounded source continuously processes all data as it comes.
	 *
	 * <p>The source may run forever (until the program is terminated) or might actually end at some point,
	 * based on some source-specific conditions. Because that is not transparent to the runtime,
	 * the runtime will use an execution mode for continuous unbounded streams whenever this mode
	 * is chosen.
	 */
	CONTINUOUS_UNBOUNDED
}

SourceSplit

Code Block
languagejava
titleSourceSplit
linenumberstrue
/**
 * An interface for all the Split types to implement.
 */
public interface SourceSplit {

	/**
	 * Get the split id of this source split.
	 * @return id of this source split.
	 */
	String splitId();
}

Anchor
SourceReader
SourceReader
SourceReader

...

Code Block
languagejava
titleSplitEnumeratorSourceReader
linenumberstrue
/**
 * The interface for a Source.source Itreader actswhich likeis aresponsible factoryfor classreading thatthe helpsrecords constructfrom
 * the {@link SplitEnumerator} andsource splits assigned by {@link SourceReader} and corresponding
 * serializersSplitEnumerator}.
 *
 * @param <T>        The type of recordsthe record producedemitted by thethis source reader.
 * @param <SplitT>   The type of the the source splits handled by the source.
 * @param <EnumChkT> The type of the enumerator checkpoints.
 */
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {

	/**
	 * Checks whether the source supports the given boundedness.
 */
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {

	/**
	 * Start the reader;
	 */
	void start();

	/**
	 * Poll the next available record into the {@link SourceOutput}.
	 *
	 * <p>The implementation must make sure this method is non-blocking.
	 *
	 * <p>Some sources might only support either continuous unbounded streams, or
	 * bounded streams.
	 *
	 * @param boundedness The boundedness to check.<p>Although the implementation can emit multiple records into the given SourceOutput,
	 * it is recommended not doing so. Instead, emit one record into the SourceOutput
	 * @returnand <code>true</code>return ifa the given boundedness is supported, <code>false</code> otherwise.
	 */
	boolean supportsBoundedness(Boundedness boundedness);

	/**{@link Status#AVAILABLE_NOW} to let the caller thread
	 * know there are more records available.
	 *
	 * Creates@return aThe new{@link readerStatus} toof readthe dataSourceReader fromafter the spits it gets assignedmethod invocation.
	 * The reader starts fresh and does not have any state to resume/
	Status pollNext(SourceOutput<T> sourceOutput) throws Exception;

	/**
	 * Checkpoint on the state of the source.
	 *
	 * @param@return configthe Astate flatof config for this source operator.the source.
	 */
	List<SplitT> snapshotState();

	/**
	 * @param@return readerContexta Thefuture {@linkthat SourceReaderContextwill context}be forcompleted theonce source reader.
	 * @return A new SourceReaderthere is a record available to poll.
	 */
	SourceReader<T, SplitT> createReader(
			Configuration config,
			SourceReaderContext readerContextCompletableFuture<Void> isAvailable();

	/**
	 * CreatesAdds a list newof SplitEnumeratorsplits for this source,reader starting a new inputto read.
	 *
	 * @param configsplits The splits assigned configurationby forthe thissplit operator.enumerator.
	 */
	void addSplits(List<SplitT> splits);

	/**
	 * @param enumContext The Handle a source event sent by the {@link SplitEnumeratorContext context} for the split enumerator.
	 * @return A new SplitEnumeratorSplitEnumerator}
	 *
	 * @param sourceEvent the event sent by the {@link SplitEnumerator}.
	 */
	SplitEnumerator<SplitT,default EnumChkT>void createEnumerator(handleSourceEvents(SourceEvent sourceEvent) {
			Configuration config,
			SplitEnumeratorContext<SplitT> enumContext);// Do nothing.
	}

	/**
	 * RestoresThe anstatus enumeratorof fromthis a checkpointreader.
	 */
	enum * @param config The configuration of this operator.
	 * @param enumContext The {@link SplitEnumeratorContext context} for the restored split enumerator.
	 * @param checkpoint The checkpoint to restore the SplitEnumerator from.
	 * @return A SplitEnumerator restored from the given checkpoint.
	 */
	SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
			Configuration config,
			SplitEnumeratorContext<SplitT> enumContext,
			EnumChkT checkpoint) throws IOException;

	// ------------------------------------------------------------------------
	//  serializers for the metadata
	// ------------------------------------------------------------------------Status {
		/** The next record is available right now. */
		AVAILABLE_NOW,
		/** The next record will be available later. */
		AVAILABLE_LATER,
		/** The source reader has completed all the reading work. */
		FINISHED
	}
}

SourceOutput

Code Block
languagejava
titleSourceOutput
linenumberstrue
public interface SourceOutput<E> extends WatermarkOutput {

	void emitRecord(E record);

	void emitRecord(E record, long timestamp);
}

SourceReaderContext

Code Block
languagejava
titleSourceReaderContext
linenumberstrue
/**
 * A context for the source reader. It allows the source reader to get the context information and
 * allows the SourceReader to send source event to its split enumerator.
 */
public interface SourceReaderContext {

	/**
	 * Returns Createsthe ametric serializergroup for thethis sourceparallel splits. Splits are serialized when sending themsubtask.
	 *
	 * @return metric group for this parallel subtask.
	 * from enumerator to reader, and when checkpointing the reader's current state/
	MetricGroup getMetricGroup();

	/**
	 * Send a source event to the corresponding SplitEnumerator.
	 *
	 * @param @returnevent The serializersource forevent theto split typesend.
	 */
	SimpleVersionedSerializer<SplitT>void getSplitSerializersendEventToEnumerator(SourceEvent event);
}

SplitEnumerator

Code Block
languagejava
titleSplitEnumerator
linenumberstrue

	/**
	 * A interface of a split enumerator responsible for the followings:
 * Creates1. discover the serializersplits for the {@link SplitEnumeratorSourceReader} checkpointto read.
	 * The serializer is used for the result of the {@link SplitEnumerator#snapshotState()}
	 * method.
	 *
	 * @return The serializer for the SplitEnumerator checkpoint.
	 */
	SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}


/**
 * The boundedness of the source: "bounded" for the currently available data (batch style),
 * "continuous unbounded" for a continuous streaming style source.
 */
public enum Boundedness {2. assign the splits to the source reader.
 */
public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable {

	/**
	 * Start the split enumerator.
	 *
	 * <p>The default behavior does nothing.
	 */
	default void start() {
		// By default do nothing.
	}

	/**
	 * AHandles boundedthe source processesevent from the data that is currently available and will end after thatsource reader.
	 *
	 * @param <p>WhensubtaskId athe sourcesubtask producesid aof boundedthe stream,source thereader runtimewho maysent activatethe additionalsource optimizationsevent.
	 * @param thatsourceEvent the aresource suitableevent onlyfrom forthe boundedsource inputreader. Incorrectly producing unbounded data when the source
	 */
	void handleSourceEvent(int subtaskId, SourceEvent sourceEvent);

	/**
	 * Add isa split setback to producethe asplit boundedenumerator. streamIt will oftenonly resulthappen inwhen programsa that{@link do not output any resultsSourceReader} fails
	 * and maythere eventuallyare failsplits dueassigned to runtimeit errorsafter (outthe oflast memory or storage)successful checkpoint.
	 */
	BOUNDED,

	/**
	 * A continuous unbounded source continuously processes all data as it comes.
	 * * @param splits The split to add back to the enumerator for reassignment.
	 * <p>The@param sourcesubtaskId mayThe runid foreverof (until the programsubtask isto terminated)which orthe mightreturned actually end at some point,
	 * based on some source-specific conditions. Because that is not transparent to the runtime,splits belong.
	 */
	void addSplitsBack(List<SplitT> splits, int subtaskId);

	/**
	 * Add a new source reader with the given subtask ID.
	 *
	 * the@param runtimesubtaskId willthe usesubtask an execution mode for continuous unbounded streams whenever this mode
	 * is chosenID of the new source reader.
	 */
	CONTINUOUS_UNBOUNDED
}

SourceSplit

Code Block
languagejava
titleSplitEnumerator
linenumberstrue
void addReader(int subtaskId);

	/**
	 * AnCheckpoints interfacethe forstate allof thethis Split types to implementsplit enumerator.
	 */
public interface SourceSplit {	CheckpointT snapshotState();

	/**
	 * Called to Getclose the split id of this source split.
	 * @return id of this source splitenumerator, in case it holds on to any resources, like threads or
	 * network connections.
	 */
	@Override
	Stringvoid splitIdclose() throws IOException;
}

...

SplitEnumeratorContext

Code Block
languagejava
titleSplitEnumeratorSplitEnumeratorContext
linenumberstrue
/**
 * A Thecontext interfaceclass for a source reader which is responsible for reading the records from
 * the source splits assigned by {@link SplitEnumerator}.
 *
 * @param <T> The type of the record emitted by this source reader.
 * @param <SplitT> The type of the the source the {@link SplitEnumerator}. This class serves the following purposes:
 * 1. Host information necessary for the SplitEnumerator to make split assignment decisions.
 * 2. Accept and track the split assignment from the enumerator.
 * 3. Provide a managed threading model so the split enumerators do not need to create their
 *    own internal threads.
 *
 * @param <SplitT> the type of the splits.
 */
public interface SourceReader<T, SplitTSplitEnumeratorContext<SplitT extends SourceSplit> extends Serializable, AutoCloseable {

	/**
	 * Start the reader;
	 */
	void startMetricGroup metricGroup();

	/**
	 * Poll the next available record into the {@link SourceOutput}.
	 *
	 * <p>The implementation must make sure this method is non-blocking Send a source event to a source reader. The source reader is identified by its subtask id.
	 *
	 * @param <p>AlthoughsubtaskId the subtask id implementationof canthe emitsource multiplereader recordsto intosend thethis givenevent SourceOutput,to.
	 * @param itevent isthe recommendedsource notevent doingto so. Instead, emit one record into the SourceOutputsend.
	 * @return anda completable returnfuture awhich {@link Status#AVAILABLE_NOW} to let the caller threadwill be completed when the event is successfully sent.
	 * know there are more records available.
	 /
	void sendEventToSourceReader(int subtaskId, SourceEvent event);

	/**
	 * @returnGet Thethe {@linknumber Status} of the SourceReader after the method invocationof subtasks.
	 *
	 * @return the number of subtasks.
	 */
	Statusint pollNextnumSubtasks(SourceOutput<T> sourceOutput) throws Exception;

	/**
	 * Checkpoint on the state of the source Get the currently registered readers. The mapping is from subtask id to the reader info.
	 *
	 * @return the statethe ofcurrently theregistered sourcereaders.
	 */
	List<SplitT> snapshotStateMap<Integer, ReaderInfo> registeredReaders();

	/**
	 * @returnAssign a future that will be completed once there is a record available to pollthe splits.
	 *
	 * @param newSplitAssignments the new split assignments to add.
	 */
	CompletableFuture<Void>void isAvailableassignSplits(SplitsAssignment<SplitT> newSplitAssignments);

	/**
	 * Adds a list of splits for this reader to read.
	 *Invoke the callable and checks its return value. If the return value is true then
	 * @paramnotify splitsthe Thesource splitscoordinator assignedthat bya thenew split enumerator.
	 */
	void addSplits(List<SplitT> splits);

	/*assignment is available.
	 *
	 * <p>It Handleis aimportant sourceto eventmake sentsure bythat the {@link SplitEnumerator} callable and handler does not modify
	 *
	 any *shared @paramstate. sourceEventOtherwise the eventthere sentmight bybe the {@link SplitEnumerator}unexpected behavior.
	 */
	default * void@param handleSourceEvents(SourceEvent sourceEvent) {
		// Do nothing.
	}

	/**
	 * The status of this readercallable a callable to call.
	 * @param handler a handler that handles the return value of or the exception thrown from the callable.
	 */
	enum Status {
	<T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler);

	/** The next record is available right now. */
		AVAILABLE_NOW,
		/** The next record will be available later. */
		AVAILABLE_LATER,
		/** The source reader has completed all the reading work. */
		FINISHED
	}
}

SourceReaderContext

Code Block
languagejava
titleSplitEnumerator
linenumberstrue
/**
 * A context for the source reader. It allows the source reader to get the context information and
 * allows the SourceReader to send source event to its split enumerator.
 */
public interface SourceReaderContext {

	/**
	 * Returns the metric group for this parallel subtask.
	 *
	 * @return metric group for this parallel subtask.
	 */
	MetricGroup getMetricGroup();

	/**
	 * Send a source event to the corresponding SplitEnumerator.
	 *
	 * @param event The source event to send.
	 */
	void sendEventToEnumerator(SourceEvent event);
}

...


	 * Invoke the callable periodically and checks its return value. If the return value is
	 * true then notify the source coordinator that a new split assignment is available.
	 *
	 * <p>It is important to make sure that the callable and handler does not modify
	 * any shared state. Otherwise the there might be unexpected behavior.
	 *
	 * @param callable the callable to call.
	 * @param handler a handler that handles the return value of or the exception thrown from the callable.
	 * @param initialDelay the initial delay of calling the callable.
	 * @param period the period between two invocations of the callable.
	 */
	<T> void callAsync(Callable<T> callable,
					   BiConsumer<T, Throwable> handler,
					   long initialDelay,
					   long period);
}

SplitAssignment

Code Block
languagejava
titleSplitEnumeratorSplitAssignment
linenumberstrue
/**
 * A interfaceclass containing ofthe asplits splitassignment enumeratorto responsiblethe forsource the followings:readers.
 *
 * 1. discover the<p>The assignment is always incremental. In another word, splits forin the {@link SourceReader} to read.assignment are simply
 * 2. assign the splitsadded to the sourceexisting readerassignment.
 */
public interfaceclass SplitEnumerator<SplitTSplitsAssignment<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable {

	 SourceSplit> {
	private final Map<Integer, List<SplitT>> assignment;

	public SplitsAssignment(Map<Integer, List<SplitT>> assignment) {
		this.assignment = assignment;
	}

	/**
	 * Start the @return A mapping from subtask ID to their split enumeratorassignment.
	 */
	public *Map<Integer, <p>TheList<SplitT>> default behavior does nothing.
	 */
	default void startassignment() {
		return assignment;
	}

	@Override
	public String toString() {
		// By default do nothing.return assignment.toString();
	}

	}

SourceEvent

Code Block
languagejava
titleSourceEvent
linenumberstrue
/**
	 * Handles theAn source event fromfor the SourceReaders sourceand readerEnumerators.
	 */
	public *interface @paramSourceEvent subtaskIdextends the subtask id of the source reader who sent the source event.
	 * @param sourceEvent the source event from the source reader.
	 */
	void handleSourceEvent(int subtaskId, SourceEvent sourceEvent);

	/**
	 * Add a split back to the split enumerator. It will only happen when a {@link SourceReader} fails
	 * and there are splits assigned to it after the last successful checkpoint.
	 *
	 * @param splits The split to add back to the enumerator for reassignment.
	 * @param subtaskId The id of the subtask to which the returned splits belong.
	 */
	void addSplitsBack(List<SplitT> splits, int subtaskId);

	/**
	 * Add a new source reader with the given subtask ID.
	 *
	 * @param subtaskId the subtask ID of the new source reader.
	 */
	void addReader(int subtaskId);

	/**
	 * Checkpoints the state of this split enumerator.
	 */
	CheckpointT snapshotState();

	/**
	 * Called to close the enumerator, in case it holds on to any resources, like threads or
	 * network connections.
	 */
	@Override
	void close() throws IOException;
}

SplitEnumeratorContext

Serializable {}

StreamExecutionEnvironment

Code Block
languagejava
titleStreamExecutionEnvironment
linenumberstrue
public class StreamExecutionEnvironment {
...
    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}
...
}

Public interface from base Source implementation

The following interfaces are high level interfaces that are introduced by the base implementation of Source.

  1. SplitReader - The stateless and thread-less high level reader which is only responsible for reading raw records of type <E> from the assigned splits.
  2. SplitChange - The split change to the split reader. Right now there is only one subclass which is SplitAddition.
  3. RecordsWithSplitIds - A container class holding the raw records of type <E> read by SplitReader.
  4. RecordEmitter - A class that takes the raw records of type <E> returned by the SplitReader, convert them into the final record type <T> and emit them into the SourceOutput.

SplitReader

Code Block
languagejava
titleSplitReader
linenumberstrue
/**
 * An interface used to read from splits. The implementation could either read from a single split or from
 * multiple splits.
 *
 * @param <E> the element type.
 * @param <SplitT> the split type.
 */
public interface SplitReader<E, SplitT extends SourceSplit> {

	/**
	 * Fetch elements into the blocking queue for the given splits. The fetch call could be blocking
	 * but it should get unblocked when {@link #wakeUp()} is invoked. In that case, the implementation
	 * may either decide to return without throwing an exception, or it can just throw an interrupted
	 * exception. In either case, this method should be reentrant, meaning that the next fetch call
	 * should just resume from where the last fetch call was waken up or interrupted
Code Block
languagejava
titleStreamExecutionEnvironment
linenumberstrue
/**
 * A context class for the {@link SplitEnumerator}. This class serves the following purposes:
 * 1. Host information necessary for the SplitEnumerator to make split assignment decisions.
 * 2. Accept and track the split assignment from the enumerator.
 * 3. Provide a managed threading model so the split enumerators do not need to create their
 *    own internal threads.
 *
 * @param <SplitT> the type of the splits.
 */
public interface SplitEnumeratorContext<SplitT extends SourceSplit> {

	MetricGroup metricGroup();

	/**
	 * Send a source event to a source reader. The source reader is identified by its subtask id.
	 *
	 * @param subtaskId@return the subtask idIds of the source reader to send this event to.
	 * @param event the source event to send finished splits.
	 *
	 * @throws InterruptedException when interrupted
	 */
	RecordsWithSplitIds<E> fetch() throws InterruptedException;

	/**
	 * Handle the split changes. This call should be non-blocking.
	 * @return
	 * @param splitsChanges a completablequeue futurewith whichsplit willchanges bethat completedhas whennot thebeen eventhandled isby successfullythis sentSplitReader.
	 */
	void sendEventToSourceReader(int subtaskId, SourceEvent eventhandleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);

	/**
	 * Get Wake up the split reader in case the numberfetcher thread of subtasks.
	 *is blocking in
	 * @return the number of subtasks{@link #fetch()}.
	 */
	intvoid numSubtaskswakeUp();

	}

SplitChange

Code Block
languagejava
titleSplitChange
linenumberstrue
/**
	 * GetAn theabstract currentlyclass registeredto readers.host Thesplits mapping is from subtask id to the reader info.
	 *
	 * @return the currently registered readers.
	 */
	Map<Integer, ReaderInfo> registeredReaders();

change.
 */
public abstract class SplitsChange<SplitT> {
	private final List<SplitT> splits;

	SplitsChange(List<SplitT> splits) {
		this.splits = splits;
	}

	/**
	 * Assign@return the list of splits.
	 */
	public *List<SplitT> splits() {
		return Collections.unmodifiableList(splits);
	}
}

/**
 * A change to add splits.
 *
 * @param <SplitT> the split type.
 */
public class SplitsAddition<SplitT> extends SplitsChange<SplitT> {

	public SplitsAddition(List<SplitT> splits) {
		super(splits);
	}
}

RecordsWithSplitIds

Code Block
languagejava
titleRecordWithSplitIds
linenumberstrue
/**
 * An interface for the elements passed from the fetchers to the source reader.
 */
public interface RecordsWithSplitIds<E> {

	/**
	 * Get all the split ids.
	 *
	 * @return a collection of split ids@param newSplitAssignments the new split assignments to add.
	 */
	void assignSplits(SplitsAssignment<SplitT> newSplitAssignments);

	/**
	 * Invoke the callable and checks its return value. If the return value is true then
	 * notify the source coordinator that a new split assignment is available.
	 *
	 * <p>It is important to make sure that the callable and handler does not modify
	 * any shared state. Otherwise the there might be unexpected behavior.
	 *
	 * @param callable a callable to call.
	 * @param handler a handler that handles the return value of or the exception thrown from the callable.
	 */
	<T>Collection<String> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handlergetSplitIds();

	/**
	 * Invoke the callable periodically and checks its return value. If**
	 * Get all the returnrecords valueby isSplits;
	 *
	 true* then@return notifya themapping sourcefrom coordinatorsplit thatids ato new split assignment is available.
	 the records.
	 */
	Map<String, Collection<E>> getRecordsBySplits();

	/**
	 * <p>It is important to make sure thatGet the callable and handler does not modifyfinished splits.
	 *
	 * any@return sharedthe state.finished Otherwisesplits theafter therethis might beRecordsWithSplitIds unexpectedis behaviorreturned.
	 */
	Set<String> getFinishedSplits();
}

RecordEmitter

Code Block
languagejava
titleRecordEmitter
linenumberstrue
/**
 @param* callableEmit thea callablerecord to the calldownstream.
	 *
 * @param handler a handler that handles the return value of or<E> the type of the exceptionrecord thrownemitted fromby the callable.
	 {@link SplitReader}
 * @param initialDelay<T> the initial delay of callingtype of records that are eventually emitted to the callable{@link SourceOutput}.
	 * @param period<SplitStateT> the period between two invocationsmutable type of thesplit callablestate.
	 */
	<T>public void callAsync(Callable<T> callable,
					   BiConsumer<T, Throwable> handler,
					   long initialDelay,
					   long period);
}

SplitAssignment

Code Block
languagejava
titleStreamExecutionEnvironment
linenumberstrue
/**
 * A class containing the splits assignment to the source readers.
 *
 * <p>The assignment is always incremental. In another word, splits in the assignment are simply
 * added to the existing assignment.
 */
public class SplitsAssignment<SplitT extends SourceSplit> {
	private final Map<Integer, List<SplitT>> assignment;

	public SplitsAssignment(Map<Integer, List<SplitT>> assignment) {
		this.assignment = assignment;
	}

	/**
	 * @return A mapping from subtask ID to their split assignment.
	 */
	public Map<Integer, List<SplitT>> assignment() {
		return assignment;
	}

	@Override
	public String toString() {
		return assignment.toString();
	}
}

SourceEvent

Code Block
languagejava
titleStreamExecutionEnvironment
linenumberstrue
/**
 * An event for the SourceReaders and Enumerators.
 */
public interface SourceEvent extends Serializable {}
interface RecordEmitter<E, T, SplitStateT> {

	/**
	 * Process and emit the records to the {@link SourceOutput}. A few recommendations to the implementation
	 * are following:
	 *
	 * <ul>
	 * 	<li>The method maybe interrupted in the middle. In that case, the same set of records will be passed
	 * 	to the record emitter again later. The implementation needs to make sure it reades
	 * 	<li>
	 * </ul>
	 *
	 * @param element The intermediate element read by the SplitReader.
	 * @param output The output to which the final records are emit to.
	 * @param splitState The state of the split.
	 */
	void emitRecord(E element, SourceOutput<T> output, SplitStateT splitState) throws Exception;
}

Public interface from RPC gateway

TaskExecutorGateway

...

Code Block
languagejava
titleStreamExecutionEnvironmentTaskExecutorGateway
linenumberstrue
public classinterface StreamExecutionEnvironment {
...
    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}
...
}

SourceOutput and Watermarking

Code Block
languagejava
titleSourceOutput and Watermarking
linenumberstrue
public interface SourceOutput<E> extends WatermarkOutput {

	void emitRecord(E record);

	void emitRecord(E record, long timestamp);
}


/**
 * An output for watermarks. The output accepts watermarks and idleness (inactivity) status.
 */
public interface WatermarkOutput {

	/**
	 * Emits the given watermark.
	 *
	 * <p>Emitting a watermark also ends previously marked idleness.
	 */
	void emitWatermark(Watermark watermark);

	/**
	 * Marks this output as idle, meaning that downstream operations do not
	 * wait for watermarks from this output.
	 *
	 * <p>An output becomes active again as soon as the next watermark is emitted.
	 */
	void markIdle();
}

Proposed Changes

TaskExecutorGateway extends RpcGateway {

    ...

	/**
	 * Sends an operator event to an operator in a task executed by this task executor.
	 *
	 * <p>The reception is acknowledged (future is completed) when the event has been dispatched to the
	 * {@link org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable#dispatchOperatorEvent(OperatorID, SerializedValue)}
	 * method. It is not guaranteed that the event is processes successfully within the implementation.
	 * These cases are up to the task and event sender to handle (for example with an explicit response
	 * message upon success, or by triggering failure/recovery upon exception).
	 */
	CompletableFuture<Acknowledge> sendOperatorEvent(
			ExecutionAttemptID task,
			OperatorID operator,
			SerializedValue<OperatorEvent> evt);
    
    ...
}

JobMasterGateway

Code Block
languagejava
titleJobMasterGateway
linenumberstrue
public interface JobMasterGateway {
    ...
	CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(
			ExecutionAttemptID task,
			OperatorID operatorID,
			SerializedValue<OperatorEvent> event);
    ...
}

Implementation Plan

The implementation should proceed in the following steps, some of which can proceed concurrently.

  1. Validate the interface proposal by implementing popular connectors of different patterns:
    1. FileSource
      1. For a row-wise format (splittable within files, checkpoint offset within a split)
      2. For a bulk format like Parquet / Orc.
      3. Bounded and unbounded split enumerator
    2. KafkaSource
      1. Unbounded without dynamic partition discovery
      2. Unbounded with dynamic partition discovery
      3. Bounded
    3. Kinesis
      1. Unbounded

  2. Implement test harnesses for the high-level readers patterns
  3. Test their functionality of the readers implemented in (1)

  4. Implement a new SourceReaderTask and implement the single-threaded mailbox logic

  5. Implement SourceEnumeratorTask

  6. Implement the changes to network channels and scheduler, or to RPC service and checkpointing, to handle split assignment and checkpoints and re-adding splits.

...

We do not touch the DataSet API, which will be eventually subsumed by the DataStream API anyways.

Test Plan

TBDUnit test and integration test for each implementations.


...

Appendix - Previous Versions

Public Interfaces

We propose a new Source interface along with two companion interfaces SplitEnumerator and SplitReader:

...

A naive implementation prototype that implements this in user space atop the existing Flink operations is given here: https://github.com/aljoscha/flink/commits/refactor-source-interface. This also comes with a complete Kafka source implementation that already supports checkpointing.

Proposed Changes

As an MVP, we propose to add the new interfaces and a runtime implementation using the existing SourceFunction for running the enumerator along with a special operator implementation for running the split reader. As a next step, we can add a dedicated StreamTask implementation for both the enumerator and reader to take advantage of the additional optimization potential. For example, more efficient handling of the checkpoint lock.

...