Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSourceReader reading methods
linenumberstrue
public interface SplitReader<E, SplitT extends SourceSplit> {

	RecordsWithSplitIds<E> fetch() throws InterruptedException;

	void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);

	void wakeUp();
}

Where to run the enumerator

There has been a long discussion about where to run the enumerator which we documented in the appendix. The final approach we took was very similar to option 3.


Per Split Event Time

With the introduction of the SourceSplit, we can actually emit per split event time for the users. We plan to propose the solution in a separate FLIP instead of in this FLIP to reduce the complexity.

...

The event time alignment becomes easier to implement with the generic communication mechanism introduced between SplitEnumerator and SourceReader. In this FLIP we do not include this in the base implementation to reduce the complexity. Anchorwhere_run_enumeratorwhere_run_enumerator

Where to Run the Enumerator

The communication of splits between the Enumerator and the SourceReader has specific requirements:

  • Lazy / pull-based assignment: Only when a reader requests the next split should the enumerator send a split. That results in better load-balancing
  • Payload on the "pull" message, to communicate information like "location" from the SourceReader to SplitEnumerator, thus supporting features like locality-aware split assignment.
  • Exactly-once fault tolerant with checkpointing: A split is sent to the reader once. A split is either still part of the enumerator (and its checkpoint) or part of the reader or already complete.
  • Exactly-once between checkpoints (and without checkpointing): Between checkpoints (and in the absence of checkpoints), the splits that were assigned to readers must be re-added to the enumerator upon failure / recovery.
  • Communication channel must not connect tasks into a single failover region

Given these requirements, there would be two options to implement this communication.

Option 1: Enumerator on the TaskManager

The SplitEnumerator runs as a task with parallelism one. Downstream of the enumerator are the SourceReader tasks, which run in parallel. Communication goes through the regular data streams.

The readers request splits by sending "backwards events", similar to "request partition" or the "superstep synchronization" in the batch iterations. These are not exposed in operators, but tasks have access to them. 
The task reacts to the backwards events: Only upon an event will it send a split. That gives us lazy/pull-based assignment. Payloads on the request backwards event messages (for example for locality awareness) is possible.

Checkpoints and splits are naturally aligned, because splits go through the data channels. The enumerator is effectively the only entry task from the source, and the only one that receives the "trigger checkpoint" RPC call.

Image Removed

The network connection between enumerator and split reader is treated by the scheduler as a boundary of a failover region.

To decouple the enumerator and reader restart, we need one of the following mechanisms:

...

  • Pro: Re-distribution of splits across all readers upon failure/recovery (no stragglers).
  • Con: Breaks abstraction that separates task and network stack.

Public Interface

The public interface changes introduced by this FLIP consist of three parts.

  • The top level public interfaces.
  • The interfaces introduced as a part of the base implementation of the top level public interfaces.
    • The base implementation provides common functionalities required for most Source implementations. See base implementation for details.
  • The RPC gateway interface change for the generic message passing mechanism.

Top level public interfaces

  • Source - A factory style class that helps create SplitEnumerator and SourceReader at runtime.
  • SourceSplit - An interface for all the split types.
  • SplitEnumerator - Discover the splits and assign them to the SourceReaders
  • SplitEnumeratorContext - Provide necessary information to the SplitEnumerator to assign splits and send custom events to the the SourceReaders.
  • SplitAssignment - A container class holding the source split assignment for each subtask.
  • SourceReader - Read the records from the splits assigned by the SplitEnumerator.
  • SourceReaderContext - Provide necessary function to the SourceReader to communicate with SplitEnumerator.
  • SourceOutput - A collector style interface to take the records and timestamps emit by the SourceReader.

Source

Code Block
languagejava
titleSource
linenumberstrue
/**
 * The interface for Source. It acts like a factory class that helps construct
 * the {@link SplitEnumerator} and {@link SourceReader} and corresponding
 * serializers.
 *
 * @param <T>        The type of records produced by the source.
 * @param <SplitT>   The type of splits handled by the source.
 * @param <EnumChkT> The type of the enumerator checkpoints.
 */
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {

	/**
	 * Checks whether the source supports the given boundedness.
	 *
	 * <p>Some sources might only support either continuous unbounded streams, or
	 * bounded streams.
	 *
	 * @param boundedness The boundedness to check.
	 * @return <code>true</code> if the given boundedness is supported, <code>false</code> otherwise.
	 */
	boolean supportsBoundedness(Boundedness boundedness);

	/**
	 * Creates a new reader to read data from the spits it gets assigned.
	 * The reader starts fresh and does not have any state to resume.
	 *
	 * @param config A flat config for this source operator.
	 * @param readerContext The {@link SourceReaderContext context} for the source reader.
	 * @return A new SourceReader.
	 */
	SourceReader<T, SplitT> createReader(
			Configuration config,
			SourceReaderContext readerContext);

	/**
	 * Creates a new SplitEnumerator for this source, starting a new input.
	 *
	 * @param config The configuration for this operator.
	 * @param enumContext The {@link SplitEnumeratorContext context} for the split enumerator.
	 * @return A new SplitEnumerator.
	 */
	SplitEnumerator<SplitT, EnumChkT> createEnumerator(
			Configuration config,
			SplitEnumeratorContext<SplitT> enumContext);

	/**
	 * Restores an enumerator from a checkpoint.
	 *
	 * @param config The configuration of this operator.
	 * @param enumContext The {@link SplitEnumeratorContext context} for the restored split enumerator.
	 * @param checkpoint The checkpoint to restore the SplitEnumerator from.
	 * @return A SplitEnumerator restored from the given checkpoint.
	 */
	SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
			Configuration config,
			SplitEnumeratorContext<SplitT> enumContext,
			EnumChkT checkpoint) throws IOException;

	// ------------------------------------------------------------------------
	//  serializers for the metadata
	// ------------------------------------------------------------------------

	/**
	 * Creates a serializer for the source splits. Splits are serialized when sending them
	 * from enumerator to reader, and when checkpointing the reader's current state.
	 *
	 * @return The serializer for the split type.
	 */
	SimpleVersionedSerializer<SplitT> getSplitSerializer();

	/**
	 * Creates the serializer for the {@link SplitEnumerator} checkpoint.
	 * The serializer is used for the result of the {@link SplitEnumerator#snapshotState()}
	 * method.
	 *
	 * @return The serializer for the SplitEnumerator checkpoint.
	 */
	SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}


/**
 * The boundedness of the source: "bounded" for the currently available data (batch style),
 * "continuous unbounded" for a continuous streaming style source.
 */
public enum Boundedness {

	/**
	 * A bounded source processes the data that is currently available and will end after that.
	 *
	 * <p>When a source produces a bounded stream, the runtime may activate additional optimizations
	 * that are suitable only for bounded input. Incorrectly producing unbounded data when the source
	 * is set to produce a bounded stream will often result in programs that do not output any results
	 * and may eventually fail due to runtime errors (out of memory or storage).
	 */
	BOUNDED,

	/**
	 * A continuous unbounded source continuously processes all data as it comes.
	 *
	 * <p>The source may run forever (until the program is terminated) or might actually end at some point,
	 * based on some source-specific conditions. Because that is not transparent to the runtime,
	 * the runtime will use an execution mode for continuous unbounded streams whenever this mode
	 * is chosen.
	 */
	CONTINUOUS_UNBOUNDED
}

SourceSplit

Code Block
languagejava
titleSourceSplit
linenumberstrue
/**
 * An interface for all the Split types to implement.
 */
public interface SourceSplit {

	/**
	 * Get the split id of this source split.
	 * @return id of this source split.
	 */
	String splitId();
}

Anchor
SourceReader
SourceReader
SourceReader

Code Block
languagejava
titleSourceReader
linenumberstrue
/**
 * The interface for a source reader which is responsible for reading the records from
 * the source splits assigned by {@link SplitEnumerator}.
 *
 * @param <T> The type of the record emitted by this source reader.
 * @param <SplitT> The type of the the source splits.
 */
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {

	/**
	 * Start the reader;
	 */
	void start();

	/**
	 * Poll the next available record into the {@link SourceOutput}.
	 *
	 * <p>The implementation must make sure this method is non-blocking.
	 *
	 * <p>Although the implementation can emit multiple records into the given SourceOutput,
	 * it is recommended not doing so. Instead, emit one record into the SourceOutput
	 * and return a {@link Status#AVAILABLE_NOW} to let the caller thread
	 * know there are more records available.
	 *
	 * @return The {@link Status} of the SourceReader after the method invocation.
	 */
	Status pollNext(SourceOutput<T> sourceOutput) throws Exception;

	/**
	 * Checkpoint on the state of the source.
	 *
	 * @return the state of the source.
	 */
	List<SplitT> snapshotState();

	/**
	 * @return a future that will be completed once there is a record available to poll.
	 */
	CompletableFuture<Void> isAvailable();

	/**
	 * Adds a list of splits for this reader to read.
	 *
	 * @param splits The splits assigned by the split enumerator.
	 */
	void addSplits(List<SplitT> splits);

	/**
	 * Handle a source event sent by the {@link SplitEnumerator}
	 *
	 * @param sourceEvent the event sent by the {@link SplitEnumerator}.
	 */
	default void handleSourceEvents(SourceEvent sourceEvent) {
		// Do nothing.
	}

	/**
	 * The status of this reader.
	 */
	enum Status {
		/** The next record is available right now. */
		AVAILABLE_NOW,
		/** The next record will be available later. */
		AVAILABLE_LATER,
		/** The source reader has completed all the reading work. */
		FINISHED
	}
}

SourceOutput

Code Block
languagejava
titleSourceOutput
linenumberstrue
public interface SourceOutput<E> extends WatermarkOutput {

	void emitRecord(E record);

	void emitRecord(E record, long timestamp);
}

SourceReaderContext

Code Block
languagejava
titleSourceReaderContext
linenumberstrue
/**
 * A context for the source reader. It allows the source reader to get the context information and
 * allows the SourceReader to send source event to its split enumerator.
 */
public interface SourceReaderContext

Option 2: Enumerator on the JobManager

Similar to the current batch (DataSet) input spit assigner, the SplitEnumerator code runs in the JobManager, as part of an ExecutionJobVertex. To support periodic split discovery, the enumerator has to be periodically called from an additional thread.

The readers request splits via an RPC message and the enumerator responds via RPC. RPC messages carry payload for information like location.

Extra care needs to me taken to align the split assignment messages with checkpoint barriers. If we start to support metadata-based watermarks (to handle event time consistently when dealing with collections of bounded splits), we need to support that as well through RPC and align it with the input split assignment.

The enumerator creates its own piece of checkpoint state when a checkpoint is triggered.

Critical parts here are the added complexity on the master (ExecutionGraph) and the checkpoints. Aligning them properly with RPC messages is possible when going through the now single threaded execution graph dispatcher thread, but to support asynchronous checkpoint writing requires more complexity.

Option 3: Introduce an independent component named SourceCoordinator, Enumerator runs on the SourceCoordinator

The SourceCoordinator is an independent component, not a part of ExecutionGraph. The SourceCoordinator could run on JobMaster or run as an independent process. There is no restrict by design. Communication with SourceCoordinator (Enumerator) is through RPC. Split assignment through RPC supports pull-based. SourceReader need to register to SourceCoordinator (address is in TaskDeploymentDescriptor or be updated by JobMaster through RPC) and then sends split request with payload information. 

Each job has at most one SourceCoordinator which is started by JobMaster. There might be several Enumerators in one job since there might be several different sources, all Enumerators run on this SourceCoordinator.

Split assignment need to satisfy the checkpointing mode semantics. Enumerator has its own states (split assignment), they are a part of global checkpoint. When a new checkpoint is triggered, CheckpointCoordinator sends barriers to SourceCoordinator first. SourceCoordinator snapshots states of all Enumerators. Then SourceCoordinator sends barriers to SourceReader through RPC. The split and barrier through RPC is FIFO, so Flink could align the split assignment with checkpoint naturally. 

If user specifies RestartAllStrategy as the failover strategy, Flink restarts all tasks and SourceCoordinator when a task fails. All tasks and Enumerators are restarted and restored from last successful checkpoint. 

If user specifies RestartPipelinedRegionStrategy as failover strategy, its a little complicated. There is no failover region problem in this model, since there is no execution edge between Enumerator and SourceReader (SourceCoordinator is not a port of ExecutionGraph). We need to explain it separately.

  • When a SourceReader task fails,JobMaster does not restart the SourceCoordinator or the Enumerators on it. JobMaster cancels other tasks in the same failover region with failed task as usual. Then JobMaster notifies Enumerator the failure or cancelation of SourceReader tasks (there might be multiple SourceReader tasks in same failover region) and which checkpoint version will be restored from. The notification happens before restarting new tasks. When Enumerator is aware of the task failures, it restores the states related failed tasks from the specific checkpoint version. That means SourceCoordinator need to support partial restoring. Enumerator also keeps a two-level map of SourceReader, checkpoint version and split assignment in memory. This map helps to find the splits should be reassigned or added back to Enumerator. There would be different strategies to handle these failed splits. In some event-time based jobs, reassignment of failed splits to other tasks may break the watermark semantics. After restoring the split assignment state, reconstructing the map in memory and handling the failed splits,Enumerator returns an acknowledgement back to JobMaster, then JobMaster restarts the tasks of failed region. There might be an optimization that Enumerator returns an acknowledgement immediately without waiting for restoring. Thus the scheduling of failed region tasks and restoring Enumerator can be processing at the same time. Another important thing is that when Enumerator is restoring, the other running SourceReaders should work normally, including pulling next split. 

  • When Enumerator or SourceCoordinator fails, if there is a write-ahead log available (mentioned below),JobMaster would restart the Enumerator or SourceCoordinator but not restart SourceReader tasks. After restarting,Enumerator restores states, replays the write-ahead log, then starts to working. At the meantime,SourceReader waits for reconnecting, there is no more splits assigned temporarily until reregistering successfully. The reregistration is necessary. There should be alignment after replaying write-ahead log between Enumerator and SourceReader because Enumerator can not make sure last split assignments to each SourceReader are successful or not. The reconnection information is updated by JobMaster if needed (process is crashed). If there is no write-ahead log available, the failover would fallback to global failover, all tasks and Enumerators would be restarted and restored from last successful checkpoint.

CheckpointCoordinator should notify Enumerator that checkpoint has been completed. So Enumerator could prune the map kept in memory and the write-ahead log.

Open Questions

In both cases, the enumerator is a point of failure that requires a restart of the entire dataflow.
To circumvent that, we probably need an additional mechanism, like a write-ahead log for split assignment.

Comparison between Options

...

Encapsulation of Enumerator

...

No changes necessary (splits are through RPC, naturally align with barriers)

...

Watermarks would go through ExecutionGraph
and RPC.

...

Need to add support for asynchronous state on the SourceCoordinator

...

Network reconnects (like above), plus write-ahead of split
assignment between checkpoints.

...

Tracking split assignment between checkpoints, plus
write-ahead of split assignment between checkpoints.

...

Personal opinion from Stephan:  If we find an elegant way to abstract the network stack changes, I would lean towards running the Enumerator in a Task, not on the JobManager.

Public Interface

The public interface changes introduced by this FLIP consist of three parts.

  • The top level public interfaces.
  • The interfaces introduced as a part of the base implementation of the top level public interfaces.
    • The base implementation provides common functionalities required for most Source implementations. See base implementation for details.
  • The RPC gateway interface change for the generic message passing mechanism.

Top level public interfaces

  • Source - A factory style class that helps create SplitEnumerator and SourceReader at runtime.
  • SourceSplit - An interface for all the split types.
  • SplitEnumerator - Discover the splits and assign them to the SourceReaders
  • SplitEnumeratorContext - Provide necessary information to the SplitEnumerator to assign splits and send custom events to the the SourceReaders.
  • SplitAssignment - A container class holding the source split assignment for each subtask.
  • SourceReader - Read the records from the splits assigned by the SplitEnumerator.
  • SourceReaderContext - Provide necessary function to the SourceReader to communicate with SplitEnumerator.
  • SourceOutput - A collector style interface to take the records and timestamps emit by the SourceReader.

Source

Code Block
languagejava
titleSource
linenumberstrue
/**
 * The interface for Source. It acts like a factory class that helps construct
 * the {@link SplitEnumerator} and {@link SourceReader} and corresponding
 * serializers.
 *
 * @param <T>        The type of records produced by the source.
 * @param <SplitT>   The type of splits handled by the source.
 * @param <EnumChkT> The type of the enumerator checkpoints.
 */
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {

	/**
	 * ChecksReturns whetherthe themetric sourcegroup supportsfor thethis givenparallel boundednesssubtask.
	 *
	 * <p>Some@return sourcesmetric might only support either continuous unbounded streams, orgroup for this parallel subtask.
	 * bounded streams.
	 /
	MetricGroup getMetricGroup();

	/**
	 * @paramSend boundednessa Thesource boundednessevent to the corresponding checkSplitEnumerator.
	 *
	 @return* <code>true</code>@param ifevent theThe givensource boundednessevent is supported, <code>false</code> otherwiseto send.
	 */
	booleanvoid supportsBoundednesssendEventToEnumerator(BoundednessSourceEvent boundednessevent);

	}

SplitEnumerator

Code Block
languagejava
titleSplitEnumerator
linenumberstrue
/**
	 * CreatesA ainterface newof readera tosplit readenumerator data from the spits it gets assigned.
	responsible for the followings:
 * The1. readerdiscover startsthe freshsplits andfor doesthe not have any state{@link SourceReader} to resumeread.
	 *
	 *2. @paramassign configthe Asplits flat config for thisto the source operatorreader.
	 * @param readerContext The {@link SourceReaderContext context} for the source reader/
public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable {

	/**
	 * Start the split enumerator.
	 * @return A new SourceReader
	 * <p>The default behavior does nothing.
	 */
	SourceReader<T, SplitT> createReader(
			Configuration config,
			SourceReaderContext readerContext);default void start() {
		// By default do nothing.
	}

	/**
	 * CreatesHandles athe newsource SplitEnumeratorevent forfrom thisthe source, starting a new input reader.
	 *
	 * @param configsubtaskId Thethe configurationsubtask forid this operator.
	 * @param enumContext The {@link SplitEnumeratorContext context} for the split enumeratorof the source reader who sent the source event.
	 * @return A new SplitEnumerator @param sourceEvent the source event from the source reader.
	 */
	SplitEnumerator<SplitT, EnumChkT> createEnumerator(
			Configuration config,
			SplitEnumeratorContext<SplitT> enumContextvoid handleSourceEvent(int subtaskId, SourceEvent sourceEvent);

	/**
	 * Add Restoresa split anback enumeratorto fromthe asplit checkpointenumerator.
	 *
	It *will @paramonly confighappen Thewhen configurationa of{@link thisSourceReader} operator.fails
	 * and @paramthere enumContextare Thesplits {@linkassigned SplitEnumeratorContextto context}it forafter the restoredlast splitsuccessful enumeratorcheckpoint.
	 *
	 * @param checkpointsplits The checkpointsplit to add back restoreto the enumerator SplitEnumeratorfor fromreassignment.
	 * @return A SplitEnumerator restored from the given checkpoint@param subtaskId The id of the subtask to which the returned splits belong.
	 */
	SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
			Configuration config,
			SplitEnumeratorContext<SplitT> enumContext,
			EnumChkT checkpoint) throws IOException;

	// ------------------------------------------------------------------------
	//  serializers for the metadata
	// ------------------------------------------------------------------------void addSplitsBack(List<SplitT> splits, int subtaskId);

	/**
	 * Add a new source reader with the given subtask ID.
	 *
	 * @param subtaskId the subtask ID of the new source reader.
	 */
	void addReader(int subtaskId);

	/**
	 * CreatesCheckpoints athe serializerstate forof thethis sourcesplit splitsenumerator. Splits are serialized when sending them
	 */
	CheckpointT snapshotState();

	/**
	 * from enumeratorCalled to reader,close andthe whenenumerator, checkpointingin thecase reader'sit current state.
	 *
	 * @return The serializer for the split typeholds on to any resources, like threads or
	 * network connections.
	 */
	@Override
	SimpleVersionedSerializer<SplitT>void getSplitSerializerclose() throws IOException;
}

SplitEnumeratorContext

Code Block
languagejava
titleSplitEnumeratorContext
linenumberstrue
;

	/**
	 * CreatesA thecontext serializerclass for the {@link SplitEnumerator} checkpoint.
	 *This The serializer is used forclass serves the result of the {@link SplitEnumerator#snapshotState()}
	following purposes:
 * method1.
	 *
	Host *information @return Thenecessary serializer for the SplitEnumerator checkpoint to make split assignment decisions.
	 */
	SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}


/**
 * The boundedness of the source: "bounded" for the currently available data (batch style),
 * "continuous unbounded" for a continuous streaming style source.
 */
public enum Boundedness {

	/**
	 * A bounded source processes the data that is currently available and will end after that.
	  2. Accept and track the split assignment from the enumerator.
 * 3. Provide a managed threading model so the split enumerators do not need to create their
 *    own internal threads.
 *
 * @param <SplitT> the type of the splits.
 */
public interface SplitEnumeratorContext<SplitT extends SourceSplit> {

	MetricGroup metricGroup();

	/**
	 * <p>WhenSend a source producesevent to a bounded stream, the runtime may activate additional optimizations source reader. The source reader is identified by its subtask id.
	 *
	 that* are@param suitablesubtaskId onlythe forsubtask bounded input. Incorrectly producing unbounded data when id of the source
	 * isreader set to producesend athis bounded stream will often result in programs that do not output any resultsevent to.
	 * @param event the source event to send.
	 * and@return a maycompletable eventuallyfuture failwhich duewill tobe runtimecompleted errorswhen (outthe ofevent memoryis orsuccessfully storage)sent.
	 */
	BOUNDED,void sendEventToSourceReader(int subtaskId, SourceEvent event);

	/**
	 * AGet continuousthe unboundednumber source continuously processes all data as it comesof subtasks.
	 *
	 * @return <p>Thethe sourcenumber may run forever (until the program is terminated) or might actually end at some point,of subtasks.
	 */
	int numSubtasks();

	/**
	 * basedGet onthe somecurrently source-specificregistered conditionsreaders. BecauseThe thatmapping is notfrom subtask transparentid to the reader runtime,info.
	 *
	 the* runtime@return willthe usecurrently an execution mode for continuous unbounded streams whenever this moderegistered readers.
	 */
	Map<Integer, ReaderInfo> registeredReaders();

	/**
	 * isAssign the chosensplits.
	 */
	CONTINUOUS_UNBOUNDED
}

SourceSplit

Code Block
languagejava
titleSourceSplit
linenumberstrue
/**
 * An@param interfacenewSplitAssignments forthe all thenew Splitsplit typesassignments to implementadd.
	 */
public interface SourceSplit {	void assignSplits(SplitsAssignment<SplitT> newSplitAssignments);

	/**
	 * GetInvoke the splitcallable idand ofchecks thisits sourcereturn split.value. If the return value is true then
	 * notify the @returnsource idcoordinator ofthat thisa sourcenew split assignment is available.
	 */
	String splitId();
}

...

Code Block
languagejava
titleSourceReader
linenumberstrue
/**
 * The interface for a source reader which <p>It is responsibleimportant forto readingmake thesure recordsthat from
 * the sourcecallable splitsand assignedhandler bydoes {@link SplitEnumerator}.
not modify
	 *
 *any @paramshared <T>state. TheOtherwise type of the recordthere emittedmight bybe this source reader.
unexpected behavior.
	 *
	 * @param <SplitT>callable Thea typecallable of the the source splits.
 */
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {

	/**
	 * Start the reader;to call.
	 * @param handler a handler that handles the return value of or the exception thrown from the callable.
	 */
	<T> void start(callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler);

	/**
	 * PollInvoke the nextcallable availableperiodically recordand intochecks theits {@linkreturn SourceOutput}value.
	 *
	If *the <p>The implementation must make sure this method is non-blocking.return value is
	 *
	 true *then <p>Althoughnotify the source implementationcoordinator canthat emita multiplenew recordssplit intoassignment the given SourceOutput,is available.
	 *
	 * it<p>It is recommendedimportant to notmake doingsure so.that Instead,the emitcallable oneand recordhandler intodoes thenot SourceOutputmodify
	 * any andshared returnstate. aOtherwise {@link Status#AVAILABLE_NOW} to let the caller threadthe there might be unexpected behavior.
	 *
	 * know@param therecallable arethe morecallable recordsto availablecall.
	 *
	 *@param @returnhandler Thea {@linkhandler Status}that ofhandles the SourceReaderreturn aftervalue theof method invocation.
	 */
	Status pollNext(SourceOutput<T> sourceOutput) throws Exception;

	/**or the exception thrown from the callable.
	 * Checkpoint@param oninitialDelay the initial statedelay of calling the sourcecallable.
	 *
	 *@param @returnperiod the stateperiod between two invocations of the sourcecallable.
	 */
	List<SplitT><T> void snapshotState();

	/**
	 * @return a future that will be completed once there is a record available to poll.
	 */
	CompletableFuture<Void> isAvailable();

	/**
	 * Adds a list of splits for this reader to read.
	 *
	 * @param splits The splits assigned by the split enumerator.
	 */
	void addSplits(List<SplitT> splits);

	/**
	 * Handle a source event sent by the {@link SplitEnumerator}
	 *
	 * @param sourceEvent the event sent by the {@link SplitEnumerator}.
	 */
	default void handleSourceEvents(SourceEvent sourceEvent) {
		// Do nothing.callAsync(Callable<T> callable,
					   BiConsumer<T, Throwable> handler,
					   long initialDelay,
					   long period);
}

SplitAssignment

Code Block
languagejava
titleSplitAssignment
linenumberstrue
/**
 * A class containing the splits assignment to the source readers.
 *
 * <p>The assignment is always incremental. In another word, splits in the assignment are simply
 * added to the existing assignment.
 */
public class SplitsAssignment<SplitT extends SourceSplit> {
	private final Map<Integer, List<SplitT>> assignment;

	public SplitsAssignment(Map<Integer, List<SplitT>> assignment) {
		this.assignment = assignment;
	}

	/**
	 * The status of this reader @return A mapping from subtask ID to their split assignment.
	 */
	enumpublic Status {
		/** The next record is available right now. */
		AVAILABLE_NOW,
		/** The next record will be available later. */
		AVAILABLE_LATER,
		/** The source reader has completed all the reading work. */
		FINISHEDMap<Integer, List<SplitT>> assignment() {
		return assignment;
	}

	@Override
	public String toString() {
		return assignment.toString();
	}
}

...

SourceEvent

Code Block
languagejava
titleSourceOutputSourceEvent
linenumberstrue
public interface SourceOutput<E> extends WatermarkOutput {

	void emitRecord(E record);

	void emitRecord(E record, long timestamp);
}

...

/**
 * An event for the SourceReaders and Enumerators.
 */
public interface SourceEvent extends Serializable {}

StreamExecutionEnvironment

Code Block
languagejava
titleSourceReaderContextStreamExecutionEnvironment
linenumberstrue
/**
 * A context for the source reader. It allows the source reader to get the context information and
 * allows the SourceReader to send source event to its split enumerator.
 */
public interface SourceReaderContext {

	/**
	 * Returns the metric group for this parallel subtask.
	 *
	 * @return metric group for this parallel subtask.
	 */
	MetricGroup getMetricGroup();

	/**
	 * Send a source event to the corresponding SplitEnumerator.
	 *
	 * @param event The source event to send.
	 */
	void sendEventToEnumerator(SourceEvent event);
}

...

true
public class StreamExecutionEnvironment {
...
    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}
...
}

Public interface from base Source implementation

The following interfaces are high level interfaces that are introduced by the base implementation of Source.

  1. SplitReader - The stateless and thread-less high level reader which is only responsible for reading raw records of type <E> from the assigned splits.
  2. SplitChange - The split change to the split reader. Right now there is only one subclass which is SplitAddition.
  3. RecordsWithSplitIds - A container class holding the raw records of type <E> read by SplitReader.
  4. RecordEmitter - A class that takes the raw records of type <E> returned by the SplitReader, convert them into the final record type <T> and emit them into the SourceOutput.

SplitReader

Code Block
languagejava
titleSplitEnumeratorSplitReader
linenumberstrue
/**
 * AAn interface ofused ato splitread enumeratorfrom responsiblesplits. forThe theimplementation followings:
could *either 1.read discoverfrom thea splitssingle forsplit the {@link SourceReader} to read.or from
 * 2. assign themultiple splits to the source reader.
 */
public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable {

	/**
	 *@param Start<E> the splitelement enumeratortype.
	 *
	 * <p>The@param default<SplitT> behaviorthe doessplit nothingtype.
	 */
	defaultpublic void start() {
		// By default do nothing.
	}interface SplitReader<E, SplitT extends SourceSplit> {

	/**
	 * HandlesFetch theelements sourceinto event from the source reader.
	 *
	 * @param subtaskIdblocking queue for the subtaskgiven idsplits. ofThe thefetch sourcecall readercould who sent the source event.be blocking
	 * but @paramit sourceEventshould theget sourceunblocked eventwhen from the source reader.
	 */
	void handleSourceEvent(int subtaskId, SourceEvent sourceEvent);

	/**
	 * Add a split back to the split enumerator. It will only happen when a {@link SourceReader} fails
	 * and there are splits assigned to it after the last successful checkpoint.
	 *
	 * @param splits The split to add back to the enumerator for reassignment.
	 * @param subtaskId The id of the subtask to which the returned splits belong.{@link #wakeUp()} is invoked. In that case, the implementation
	 * may either decide to return without throwing an exception, or it can just throw an interrupted
	 * exception. In either case, this method should be reentrant, meaning that the next fetch call
	 * should just resume from where the last fetch call was waken up or interrupted.
	 *
	 * @return the Ids of the finished splits.
	 *
	 * @throws InterruptedException when interrupted
	 */
	voidRecordsWithSplitIds<E> addSplitsBackfetch(List<SplitT>) splits, int subtaskId)throws InterruptedException;

	/**
	 * AddHandle athe newsplit sourcechanges. readerThis withcall theshould given subtask IDbe non-blocking.
	 *
	 * @param subtaskIdsplitsChanges thea subtaskqueue IDwith ofsplit thechanges new source reader.
	 */
	void addReader(int subtaskId);

	/**
	 * Checkpoints the state of this split enumeratorthat has not been handled by this SplitReader.
	 */
	CheckpointTvoid snapshotStatehandleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);

	/**
	 * CalledWake toup closethe thesplit enumerator,reader in case itthe holdsfetcher onthread tois any resources, like threads orblocking in
	 * network connections{@link #fetch()}.
	 */
	@Override
	void closewakeUp() throws IOException;
}

...

SplitChange

Code Block
languagejava
titleSplitEnumeratorContextSplitChange
linenumberstrue
/**
 * A context class for the {@link SplitEnumerator}. This class serves the following purposes:
 * 1. Host information necessary for the SplitEnumerator to make split assignment decisionsAn abstract class to host splits change.
 * 2. Accept and track the split assignment from the enumerator.
 * 3. Provide a managed threading model so the split enumerators do not need to create their
 *    own internal threads/
public abstract class SplitsChange<SplitT> {
	private final List<SplitT> splits;

	SplitsChange(List<SplitT> splits) {
		this.splits = splits;
	}

	/**
	 * @return the list of splits.
	 */
	public List<SplitT> splits() {
		return Collections.unmodifiableList(splits);
	}
}

/**
 * A change to add splits.
 *
 * @param <SplitT> the split type of the splits.
 */
public interfaceclass SplitEnumeratorContext<SplitTSplitsAddition<SplitT> extends SourceSplit>SplitsChange<SplitT> {

	MetricGroup metricGroup(public SplitsAddition(List<SplitT> splits) {
		super(splits);

		}
}

RecordsWithSplitIds

Code Block
languagejava
titleRecordWithSplitIds
linenumberstrue
/**
	 * Send a source eventAn interface for the elements passed from the fetchers to athe source reader. The source reader is identified by its subtask id
 */
public interface RecordsWithSplitIds<E> {

	/**
	 * Get all the split ids.
	 *
	 * @param@return subtaskIda thecollection subtask id of the source reader to send this event to.split ids.
	 */
	Collection<String> getSplitIds();

	/**
	 * @paramGet eventall the sourcerecords event to send.by Splits;
	 *
	 * @return a completablemapping futurefrom whichsplit will be completed whenids to the event is successfully sentrecords.
	 */
	void sendEventToSourceReader(int subtaskIdMap<String, SourceEventCollection<E>> eventgetRecordsBySplits();

	/**
	 * Get the numberfinished of subtaskssplits.
	 *
	 * @return the number of subtasks * @return the finished splits after this RecordsWithSplitIds is returned.
	 */
	intSet<String> numSubtasksgetFinishedSplits();
}

RecordEmitter

Code Block
languagejava
titleRecordEmitter
linenumberstrue

	/**
	 * GetEmit a record to the currently registered readers. The mapping is from subtask id to the reader info.
	 *
	 * @return the currently registered readers.
	 */
	Map<Integer, ReaderInfo> registeredReaders();

	/**
	 * Assign the splits.
	 *
	 * @param newSplitAssignments the new split assignments to add.
	 */
	void assignSplits(SplitsAssignment<SplitT> newSplitAssignments); downstream.
 *
 * @param <E> the type of the record emitted by the {@link SplitReader}
 * @param <T> the type of records that are eventually emitted to the {@link SourceOutput}.
 * @param <SplitStateT> the mutable type of split state.
 */
public interface RecordEmitter<E, T, SplitStateT> {

	/**
	 * Process and Invokeemit the records to callablethe and checks its return value. If the return value is true then{@link SourceOutput}. A few recommendations to the implementation
	 * are following:
	 *
	 * <ul>
	 * notify	<li>The method maybe interrupted in the source coordinator that a new split assignment is available.
	 * middle. In that case, the same set of records will be passed
	 * <p>It	to isthe importantrecord toemitter makeagain surelater. thatThe theimplementation callableneeds andto handlermake doessure notit modifyreades
	 * any shared state. Otherwise the there might be unexpected behavior.	<li>
	 * </ul>
	 *
	 * @param callable a callable to call element The intermediate element read by the SplitReader.
	 * @param handleroutput aThe handleroutput thatto handleswhich the return value of or the exception thrown from the callable final records are emit to.
	 * @param splitState The state of the split.
	 */
	<T> void callAsyncemitRecord(Callable<T>E callableelement, BiConsumer<TSourceOutput<T> output, Throwable>SplitStateT handlersplitState);

	/**
	 * Invoke the callable periodically and checks its return value. If the return value is throws Exception;
}

Public interface from RPC gateway

TaskExecutorGateway

Code Block
languagejava
titleTaskExecutorGateway
linenumberstrue
public interface TaskExecutorGateway extends RpcGateway {

    ...

	/**
	 * trueSends an thenoperator notifyevent theto sourcean coordinatoroperator thatin a task newexecuted splitby assignmentthis istask availableexecutor.
	 *
	 * <p>It<p>The reception is importantacknowledged to(future makeis surecompleted) thatwhen the callableevent andhas handlerbeen doesdispatched notto modifythe
	 * any shared state. Otherwise the there might be unexpected behavior {@link org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable#dispatchOperatorEvent(OperatorID, SerializedValue)}
	 * method. It is not guaranteed that the event is processes successfully within the implementation.
	 *
	 * @param callable the callable to call.
	 * @param handler a handler that handles the return value of or the exception thrown from the callable These cases are up to the task and event sender to handle (for example with an explicit response
	 * message upon success, or by triggering failure/recovery upon exception).
	 * @param initialDelay the initial delay of calling the callable.
	 * @param period the period between two invocations of the callable.
	 */
	<T> void callAsync(Callable<T> callable,
					   BiConsumer<T, Throwable> handler,
					   long initialDelay,
					   long period);
}

SplitAssignment

Code Block
languagejava
titleSplitAssignment
linenumberstrue
/**
 * A class containing the splits assignment to the source readers.
 *
 * <p>The assignment is always incremental. In another word, splits in the assignment are simply
 * added to the existing assignment.
 */
public class SplitsAssignment<SplitT extends SourceSplit> {
	private final Map<Integer, List<SplitT>> assignment;

	public SplitsAssignment(Map<Integer, List<SplitT>> assignment) {
		this.assignment = assignment;
	}

	/**
	 * @return A mapping from subtask ID to their split assignment.
	 */
	public Map<Integer, List<SplitT>> assignment() {
		return assignment;
	}

	@Override
	public String toString() {
		return assignment.toString();
	}
}

SourceEvent

Code Block
languagejava
titleSourceEvent
linenumberstrue
/**
 * An event for the SourceReaders and Enumerators.
 */
public interface SourceEvent extends Serializable {}

StreamExecutionEnvironment

Code Block
languagejava
titleStreamExecutionEnvironment
linenumberstrue
public class StreamExecutionEnvironment {
...
    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}
...
}

Public interface from base Source implementation

The following interfaces are high level interfaces that are introduced by the base implementation of Source.

  1. SplitReader - The stateless and thread-less high level reader which is only responsible for reading raw records of type <E> from the assigned splits.
  2. SplitChange - The split change to the split reader. Right now there is only one subclass which is SplitAddition.
  3. RecordsWithSplitIds - A container class holding the raw records of type <E> read by SplitReader.
  4. RecordEmitter - A class that takes the raw records of type <E> returned by the SplitReader, convert them into the final record type <T> and emit them into the SourceOutput.

SplitReader

Code Block
languagejava
titleSplitReader
linenumberstrue
/**
 * An interface used to read from splits. The implementation could either read from a single split or from
 * multiple splits.
 *
 * @param <E> the element type.
 * @param <SplitT> the split type.
 */
public interface SplitReader<E, SplitT extends SourceSplit> {

	/**
	 * Fetch elements into the blocking queue for the given splits. The fetch call could be blocking
	 * but it should get unblocked when {@link #wakeUp()} is invoked. In that case, the implementation
	 * may either decide to return without throwing an exception, or it can just throw an interrupted
	 * exception. In either case, this method should be reentrant, meaning that the next fetch call
	 * should just resume from where the last fetch call was waken up or interrupted.
	 *
	 * @return the Ids of the finished splits.
	 *
	 * @throws InterruptedException when interrupted
	 */
	RecordsWithSplitIds<E> fetch() throws InterruptedException;

	/**
	 * Handle the split changes. This call should be non-blocking.
	 *
	 * @param splitsChanges a queue with split changes that has not been handled by this SplitReader.
	 */
	void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);

	/**
	 * Wake up the split reader in case the fetcher thread is blocking in
	 * {@link #fetch()}.
	 */
	void wakeUp();
}

SplitChange

Code Block
languagejava
titleSplitChange
linenumberstrue
/**
 * An abstract class to host splits change.
 */
public abstract class SplitsChange<SplitT> {
	private final List<SplitT> splits;

	SplitsChange(List<SplitT> splits) {
		this.splits = splits;
	}

	/**
	 * @return the list of splits.
	 */
	public List<SplitT> splits() {
		return Collections.unmodifiableList(splits);
	}
}

/**
 * A change to add splits.
 *
 * @param <SplitT> the split type.
 */
public class SplitsAddition<SplitT> extends SplitsChange<SplitT> {

	public SplitsAddition(List<SplitT> splits) {
		super(splits);
	}
}

RecordsWithSplitIds

Code Block
languagejava
titleRecordWithSplitIds
linenumberstrue
/**
 * An interface for the elements passed from the fetchers to the source reader.
 */
public interface RecordsWithSplitIds<E> {

	/**
	 * Get all the split ids.
	 *
	 * @return a collection of split ids.
	 */
	Collection<String> getSplitIds();

	/**
	 * Get all the records by Splits;
	 *
	 * @return a mapping from split ids to the records.
	 */
	Map<String, Collection<E>> getRecordsBySplits();

	/**
	 * Get the finished splits.
	 *
	 * @return the finished splits after this RecordsWithSplitIds is returned.
	 */
	Set<String> getFinishedSplits();
}

RecordEmitter

Code Block
languagejava
titleRecordEmitter
linenumberstrue
/**
 * Emit a record to the downstream.
 *
 * @param <E> the type of the record emitted by the {@link SplitReader}
 * @param <T> the type of records that are eventually emitted to the {@link SourceOutput}.
 * @param <SplitStateT> the mutable type of split state.
 */
public interface RecordEmitter<E, T, SplitStateT> {

	/**
	 * Process and emit the records to the {@link SourceOutput}. A few recommendations to the implementation
	 * are following:
	 *
	 * <ul>
	 * 	<li>The method maybe interrupted in the middle. In that case, the same set of records will be passed
	 * 	to the record emitter again later. The implementation needs to make sure it reades
	 * 	<li>
	 * </ul>
	 *
	 * @param element The intermediate element read by the SplitReader.
	 * @param output The output to which the final records are emit to.
	 * @param splitState The state of the split.
	 */
	void emitRecord(E element, SourceOutput<T> output, SplitStateT splitState) throws Exception;
}

Public interface from RPC gateway

TaskExecutorGateway

Code Block
languagejava
titleTaskExecutorGateway
linenumberstrue
public interface TaskExecutorGateway extends RpcGateway {

    ...

	/**
	 * Sends an operator event to an operator in a task executed by this task executor.
	 *
	 * <p>The reception is acknowledged (future is completed) when the event has been dispatched to the
	 * {@link org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable#dispatchOperatorEvent(OperatorID, SerializedValue)}
	 * method. It is not guaranteed that the event is processes successfully within the implementation.
	 * These cases are up to the task and event sender to handle (for example with an explicit response
	 * message upon success, or by triggering failure/recovery upon exception).
	 */
	CompletableFuture<Acknowledge> sendOperatorEvent(
			ExecutionAttemptID task,
			OperatorID operator,
			SerializedValue<OperatorEvent> evt);
    
    ...
}

JobMasterGateway

Code Block
languagejava
titleJobMasterGateway
linenumberstrue
public interface JobMasterGateway {
    ...
	CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(
			ExecutionAttemptID task,
			OperatorID operatorID,
			SerializedValue<OperatorEvent> event);
    ...
}

Implementation Plan

The implementation should proceed in the following steps, some of which can proceed concurrently.

...

/
	CompletableFuture<Acknowledge> sendOperatorEvent(
			ExecutionAttemptID task,
			OperatorID operator,
			SerializedValue<OperatorEvent> evt);
    
    ...
}

JobMasterGateway

Code Block
languagejava
titleJobMasterGateway
linenumberstrue
public interface JobMasterGateway {
    ...
	CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(
			ExecutionAttemptID task,
			OperatorID operatorID,
			SerializedValue<OperatorEvent> event);
    ...
}

Implementation Plan

The implementation should proceed in the following steps, some of which can proceed concurrently.

  1. Validate the interface proposal by implementing popular connectors of different patterns:
    1. FileSource
      1. For a row-wise format (splittable within files, checkpoint offset within a split)
      2. For a bulk format like Parquet / Orc.
      3. Bounded and unbounded split enumerator
    2. KafkaSource
      1. Unbounded without dynamic partition discovery
      2. Unbounded with dynamic partition discovery
      3. Bounded
    3. Kinesis
      1. Unbounded

  2. Implement test harnesses for the high-level readers patterns
  3. Test their functionality of the readers implemented in (1)

  4. Implement a new SourceReaderTask and implement the single-threaded mailbox logic

  5. Implement SourceEnumeratorTask

  6. Implement the changes to network channels and scheduler, or to RPC service and checkpointing, to handle split assignment and checkpoints and re-adding splits.

Compatibility, Deprecation, and Migration Plan

In the DataStream API, we mark the existing source interface as deprecated but keep it for a few releases.
The new source interface is supported by different stream operators, so the two source interfaces can easily co-exist for a while.

We do not touch the DataSet API, which will be eventually subsumed by the DataStream API anyways.

Test Plan

Unit test and integration test for each implementations.


...

Anchor
where_run_enumerator
where_run_enumerator
Appendix - Where to run the enumerator

The communication of splits between the Enumerator and the SourceReader has specific requirements:

  • Lazy / pull-based assignment: Only when a reader requests the next split should the enumerator send a split. That results in better load-balancing
  • Payload on the "pull" message, to communicate information like "location" from the SourceReader to SplitEnumerator, thus supporting features like locality-aware split assignment.
  • Exactly-once fault tolerant with checkpointing: A split is sent to the reader once. A split is either still part of the enumerator (and its checkpoint) or part of the reader or already complete.
  • Exactly-once between checkpoints (and without checkpointing): Between checkpoints (and in the absence of checkpoints), the splits that were assigned to readers must be re-added to the enumerator upon failure / recovery.
  • Communication channel must not connect tasks into a single failover region

Given these requirements, there would be three options to implement this communication. And 


Option 1: Enumerator on the TaskManager

The SplitEnumerator runs as a task with parallelism one. Downstream of the enumerator are the SourceReader tasks, which run in parallel. Communication goes through the regular data streams.

The readers request splits by sending "backwards events", similar to "request partition" or the "superstep synchronization" in the batch iterations. These are not exposed in operators, but tasks have access to them. 
The task reacts to the backwards events: Only upon an event will it send a split. That gives us lazy/pull-based assignment. Payloads on the request backwards event messages (for example for locality awareness) is possible.

Checkpoints and splits are naturally aligned, because splits go through the data channels. The enumerator is effectively the only entry task from the source, and the only one that receives the "trigger checkpoint" RPC call.

Image Added

The network connection between enumerator and split reader is treated by the scheduler as a boundary of a failover region.

To decouple the enumerator and reader restart, we need one of the following mechanisms:

  1. Pipelined persistent channels: The contents of a channel is persistent between checkpoints. A receiving task requests the data "after checkpoint X". The data is pruned when checkpoint X+1 is completed.
    When a reader fails, the recovered reader task can reconnect to the stream after the checkpoint and will get the previously assigned splits. Batch is a special case, if there are no checkpoints, then the channel holds all data since the beginning.
    • Pro: The "pipelined persistent channel" has also applications beyond the enumerator to reader connection.
    • Con: Splits always go to the same reader and cannot be distributed across multiple readers upon recovery. Especially for batch programs, this may create bad stragglers during recovery.

  2. Reconnects and task notifications on failures:The enumerator task needs to remember the splits assigned to each result partition until the next checkpoint completes. The enumerator task would have to be notified of the failure of a downstream task and add the splits back to the enumerator. Recovered reader tasks would simply reconnect and get a new stream.
    • Pro: Re-distribution of splits across all readers upon failure/recovery (no stragglers).
    • Con: Breaks abstraction that separates task and network stack.


Option 2: Enumerator on the JobManager

Similar to the current batch (DataSet) input spit assigner, the SplitEnumerator code runs in the JobManager, as part of an ExecutionJobVertex. To support periodic split discovery, the enumerator has to be periodically called from an additional thread.

The readers request splits via an RPC message and the enumerator responds via RPC. RPC messages carry payload for information like location.

Extra care needs to me taken to align the split assignment messages with checkpoint barriers. If we start to support metadata-based watermarks (to handle event time consistently when dealing with collections of bounded splits), we need to support that as well through RPC and align it with the input split assignment.

The enumerator creates its own piece of checkpoint state when a checkpoint is triggered.

Critical parts here are the added complexity on the master (ExecutionGraph) and the checkpoints. Aligning them properly with RPC messages is possible when going through the now single threaded execution graph dispatcher thread, but to support asynchronous checkpoint writing requires more complexity.


Anchor
Option3
Option3
Option 3: Introduce an independent component named SourceCoordinator, Enumerator runs on the SourceCoordinator

The SourceCoordinator is an independent component, not a part of ExecutionGraph. The SourceCoordinator could run on JobMaster or run as an independent process. There is no restrict by design. Communication with SourceCoordinator (Enumerator) is through RPC. Split assignment through RPC supports pull-based. SourceReader need to register to SourceCoordinator (address is in TaskDeploymentDescriptor or be updated by JobMaster through RPC) and then sends split request with payload information. 

Each job has at most one SourceCoordinator which is started by JobMaster. There might be several Enumerators in one job since there might be several different sources, all Enumerators run on this SourceCoordinator.

Split assignment need to satisfy the checkpointing mode semantics. Enumerator has its own states (split assignment), they are a part of global checkpoint. When a new checkpoint is triggered, CheckpointCoordinator sends barriers to SourceCoordinator first. SourceCoordinator snapshots states of all Enumerators. Then SourceCoordinator sends barriers to SourceReader through RPC. The split and barrier through RPC is FIFO, so Flink could align the split assignment with checkpoint naturally. 

If user specifies RestartAllStrategy as the failover strategy, Flink restarts all tasks and SourceCoordinator when a task fails. All tasks and Enumerators are restarted and restored from last successful checkpoint. 

If user specifies RestartPipelinedRegionStrategy as failover strategy, its a little complicated. There is no failover region problem in this model, since there is no execution edge between Enumerator and SourceReader (SourceCoordinator is not a port of ExecutionGraph). We need to explain it separately.

  • When a SourceReader task fails,JobMaster does not restart the SourceCoordinator or the Enumerators on it. JobMaster cancels other tasks in the same failover region with failed task as usual. Then JobMaster notifies Enumerator the failure or cancelation of SourceReader tasks (there might be multiple SourceReader tasks in same failover region) and which checkpoint version will be restored from. The notification happens before restarting new tasks. When Enumerator is aware of the task failures, it restores the states related failed tasks from the specific checkpoint version. That means SourceCoordinator need to support partial restoring. Enumerator also keeps a two-level map of SourceReader, checkpoint version and split assignment in memory. This map helps to find the splits should be reassigned or added back to Enumerator. There would be different strategies to handle these failed splits. In some event-time based jobs, reassignment of failed splits to other tasks may break the watermark semantics. After restoring the split assignment state, reconstructing the map in memory and handling the failed splits,Enumerator returns an acknowledgement back to JobMaster, then JobMaster restarts the tasks of failed region. There might be an optimization that Enumerator returns an acknowledgement immediately without waiting for restoring. Thus the scheduling of failed region tasks and restoring Enumerator can be processing at the same time. Another important thing is that when Enumerator is restoring, the other running SourceReaders should work normally, including pulling next split. 

  • When Enumerator or SourceCoordinator fails, if there is a write-ahead log available (mentioned below),JobMaster would restart the Enumerator or SourceCoordinator but not restart SourceReader tasks. After restarting,Enumerator restores states, replays the write-ahead log, then starts to working. At the meantime,SourceReader waits for reconnecting, there is no more splits assigned temporarily until reregistering successfully. The reregistration is necessary. There should be alignment after replaying write-ahead log between Enumerator and SourceReader because Enumerator can not make sure last split assignments to each SourceReader are successful or not. The reconnection information is updated by JobMaster if needed (process is crashed). If there is no write-ahead log available, the failover would fallback to global failover, all tasks and Enumerators would be restarted and restored from last successful checkpoint.

CheckpointCoordinator should notify Enumerator that checkpoint has been completed. So Enumerator could prune the map kept in memory and the write-ahead log.


Open Questions

In both cases, the enumerator is a point of failure that requires a restart of the entire dataflow.
To circumvent that, we probably need an additional mechanism, like a write-ahead log for split assignment.


Comparison between Options

CriterionEnumerate on TaskEnumerate on JobManagerEnumerate on SourceCoordinator

Encapsulation of Enumerator

Encapsulation in separate TaskAdditional complexity in ExecutionGraphNew component SourceCoordinator
Network Stack ChangesSignificant changes.
Some are more clear, like reconnecting. Some seem to break
abstractions, like
notifying tasks of downstream failures.
No Changes necessaryNo Changes necessary
Scheduler / Failover RegionMinor changesNo changes necessaryMinor changes
Checkpoint alignmentNo changes necessary
(splits are data messages,
naturally align with barriers)
Careful coordination between split assignment
and checkpoint triggering.
Might be simple if both actions are run in the
single-threaded ExecutionGraph thread.

No changes necessary (splits are through RPC, naturally align with barriers)

WatermarksNo changes necessary
(splits are data messages, watermarks
naturally flow)

Watermarks would go through ExecutionGraph
and RPC.

Watermarks would go through RPC
Checkpoint StateNo additional mechanism (only regular task state)Need to add support for asynchronous non-metadata
state on the JobManager / ExecutionGraph

Need to add support for asynchronous state on the SourceCoordinator

Supporting graceful
Enumerator recovery
(avoid full restarts)

Network reconnects (like above), plus write-ahead of split
assignment between checkpoints.

Tracking split assignment between checkpoints, plus
write-ahead of split assignment between checkpoints.

Tracking split assignment between checkpoints, plus
write-ahead of split assignment between checkpoints


Personal opinion from Stephan:  If we find an elegant way to abstract the network stack changes, I would lean towards running the Enumerator in a Task, not on the JobManager

...

  1. For a row-wise format (splittable within files, checkpoint offset within a split)
  2. For a bulk format like Parquet / Orc.
  3. Bounded and unbounded split enumerator

...

  1. Unbounded without dynamic partition discovery
  2. Unbounded with dynamic partition discovery
  3. Bounded

...

Implement the changes to network channels and scheduler, or to RPC service and checkpointing, to handle split assignment and checkpoints and re-adding splits.

Compatibility, Deprecation, and Migration Plan

In the DataStream API, we mark the existing source interface as deprecated but keep it for a few releases.
The new source interface is supported by different stream operators, so the two source interfaces can easily co-exist for a while.

We do not touch the DataSet API, which will be eventually subsumed by the DataStream API anyways.

Test Plan

Unit test and integration test for each implementations.

Appendix - Previous Versions

...