Where to run the enumerator

There has been was a long discussion about where to run the enumerator which we documented in the appendix. The final approach we took was very similar to option 3 with a few differences. The approach is following.

Each SplitEnumerator will be encapsulated in one SourceCoordinator. If there are multiple sources, multiple SourceCoordinator will there be. The SourceCoordinators will run in the JobMaster, but not as part of the ExecutionGraph. In this FLIP, we propose to failover the entire execution graph when the SplitEnumerator fails. A finer grained enumerator failover will be proposed in a later FLIP.

Per Split Event Time

With the introduction of the SourceSplit, we can actually emit per split event time for the users. We plan to propose the solution in a separate FLIP instead of in this FLIP to reduce the complexity.


Code Block
 * A context class for the {@link SplitEnumerator}. This class serves the following purposes:
 * 1. Host information necessary for the SplitEnumerator to make split assignment decisions.
 * 2. Accept and track the split assignment from the enumerator.
 * 3. Provide a managed threading model so the split enumerators do not need to create their
 *    own internal threads.
 * @param <SplitT> the type of the splits.
public interface SplitEnumeratorContext<SplitT extends SourceSplit> {

	MetricGroup metricGroup();

	 * Send a source event to a source reader. The source reader is identified by its subtask id.
	 * @param subtaskId the subtask id of the source reader to send this event to.
	 * @param event the source event to send.
	 * @return a completable future which will be completed when the event is successfully sent.
	void sendEventToSourceReader(int subtaskId, SourceEvent event);

	 * Get the number of subtasks.
	 * @return the number of subtasks.
	int numSubtasks();

	 * Get the currently registered readers. The mapping is from subtask id to the reader info.
	 * @return the currently registered readers.
	Map<Integer, ReaderInfo> registeredReaders();

	 * Assign the splits.
	 * @param newSplitAssignments the new split assignments to add.
	void assignSplits(SplitsAssignment<SplitT> newSplitAssignments);

	 * Invoke the callable and checkshandover itsthe return value. Ifto the returnhandler valuewhich iswill truebe thenexecuted
	 * notifyby the source coordinator that a new split assignment is available.
	 * <p>It is important to make sure that the callable and handler doesshould not modify
	 * any shared state. Otherwise the there might be unexpected behavior.
	 * @param callable a callable to call.
	 * @param handler a handler that handles the return value of or the exception thrown from the callable.
	<T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler);

	 * Invoke the callable periodically and checkshandover itsthe return value. Ifto the return value ishandler which will be executed
	 * trueby then notify the source coordinator that a new split assignment is available.
	 * <p>It is important to make sure that the callable and handler doesshould not modify
	 * any shared state. Otherwise the there might be unexpected behavior.
	 * @param callable the callable to call.
	 * @param handler a handler that handles the return value of or the exception thrown from the callable.
	 * @param initialDelay the initial delay of calling the callable.
	 * @param period the period between two invocations of the callable.
	<T> void callAsync(Callable<T> callable,
					   BiConsumer<T, Throwable> handler,
					   long initialDelay,
					   long period);
