Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSourceReader
linenumberstrue
interface SourceReader {

    void start() throws IOException;

    CompletableFuture<?> availableisAvailable() throws IOException;

    ReaderStatusStatus emitNextpollNext(SourceOutput<E> output) throws IOException;

    void addSplits(List<SplitT> splits) throws IOException;

    List<SplitT> snapshotState();
}

...

The implementation assumes that there is a single thread that drives the source. This single thread calls emitNextcalls pollNext(...) when data is available and cedes execution when nothing is available. It also handles checkpoint triggering, timer callbacks, etc. thus making the task lock free.

...

Personal opinion from Stephan:  If we find an elegant way to abstract the network stack changes, I would lean towards running the Enumerator in a Task, not on the JobManager.

...

Public Interfaces

...

Changes

The public interface changes introduced by this FLIP consist of two parts.

  • The top level public interfaces.
  • The interfaces introduced as a part of the base implementation of the top level public interfaces. The base implementation provides common functionalities required for most Source implementations. See base implementation for details.

Top level public interfaces

  • Source - A factory style class that helps create SplitEnumerator and SourceReader at runtime.
  • SourceSplit - An interface for all the split types.
  • SplitEnumerator - Discover the splits and assign them to the SourceReaders
  • SplitEnumeratorContext - Provide necessary information to the SplitEnumerator to assign splits and send custom events to the the SourceReaders.
  • SplitAssignment - A container class holding the source split assignment for each subtask.
  • SourceReader - Read the records from the splits assigned by the SplitEnumerator.
  • SourceReaderContext - Provide necessary function to the SourceReader to communicate with SplitEnumerator.
  • SourceOutput - A collector style interface to take the records and timestamps emit by the SourceReader.

Source

Code Block
languagejava
titleSplitEnumerator
linenumberstrue
/**
 * The interface for Source. It acts like a factory class that helps construct
 * the {@link SplitEnumerator} and {@link SourceReader} and corresponding
 * serializers.
 *
 * @param <T>        The type of records produced by the source.
 * @param <SplitT>   The type of splits handled by the source.
 * @param <EnumChkT> The type of the enumerator checkpoints.
 */
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {

	/**
	 * Checks whether the source supports the given boundedness.
	 *
	 * <p>Some sources might only support either continuous unbounded streams, or
	 * bounded streams.
	 *
	 * @param boundedness The boundedness to check.
	 * @return <code>true</code> if the given boundedness is supported, <code>false</code> otherwise.
	 */
	boolean supportsBoundedness(Boundedness boundedness);

	/**
	 * Creates a new reader to read data from the spits it gets assigned.
	 * The reader starts fresh and does not have any state to resume.
	 *
	 * @param config A flat config for this source operator.
	 * @param readerContext The {@link SourceReaderContext context} for the source reader.
	 * @return A new SourceReader.
	 */
	SourceReader<T, SplitT> createReader(
			Configuration config,
			SourceReaderContext readerContext);

	/**
	 * Creates a new SplitEnumerator for this source, starting a new input.
	 *
	 * @param config The configuration for this operator.
	 * @param enumContext The {@link SplitEnumeratorContext context} for the split enumerator.
	 * @return A new SplitEnumerator.
	 */
	SplitEnumerator<SplitT, EnumChkT> createEnumerator(
			Configuration config,
			SplitEnumeratorContext<SplitT> enumContext);

	/**
	 * Restores an enumerator from a checkpoint.
	 *
	 * @param config The configuration of this operator.
	 * @param enumContext The {@link SplitEnumeratorContext context} for the restored split enumerator.
	 * @param checkpoint The checkpoint to restore the SplitEnumerator from.
	 * @return A SplitEnumerator restored from the given checkpoint.
	 */
	SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
			Configuration config,
			SplitEnumeratorContext<SplitT> enumContext,
			EnumChkT checkpoint) throws IOException;

	// ------------------------------------------------------------------------
	//  serializers for the metadata
	// ------------------------------------------------------------------------

	/**
	 * Creates a serializer for the source splits. Splits are serialized when sending them
	 * from enumerator to reader, and when checkpointing the reader's current state.
	 *
	 * @return The serializer for the split type.
	 */
	SimpleVersionedSerializer<SplitT> getSplitSerializer();

	/**
	 * Creates the serializer for the {@link SplitEnumerator} checkpoint.
	 * The serializer is used for the result of the {@link SplitEnumerator#snapshotState()}
	 * method.
	 *
	 * @return The serializer for the SplitEnumerator checkpoint.
	 */
	SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}


/**
 * The boundedness of the source: "bounded" for the currently available data (batch style),
 * "continuous unbounded" for a continuous streaming style source.
 */
public enum Boundedness {

	/**
	 * A bounded source processes the data that is currently available and will end after that.
	 *
	 * <p>When a source produces a bounded stream, the runtime may activate additional optimizations
	 * that are suitable only for bounded input. Incorrectly producing unbounded data when the source
	 * is set to produce a bounded stream will often result in programs that do not output any results
	 * and may eventually fail due to runtime errors (out of memory or storage).
	 */
	BOUNDED,

	/**
	 * A continuous unbounded source continuously processes all data as it comes.
	 *
	 * <p>The source may run forever (until the program is terminated) or might actually end at some point,
	 * based on some source-specific conditions. Because that is not transparent to the runtime,
	 * the runtime will use an execution mode for continuous unbounded streams whenever this mode
	 * is chosen.
	 */
	CONTINUOUS_UNBOUNDED
}

SourceSplit

Code Block
languagejava
titleSplitEnumerator
linenumberstrue
/**
 * An interface for all the Split types to implement.
 */
public interface SourceSplit {

	/**
	 * Get the split id of this source split.
	 * @return id of this source split.
	 */
	String splitId();
}

SourceReader

Code Block
languagejava
titleSplitEnumerator
linenumberstrue
/**
 * The interface for a source reader which is responsible for reading the records from
 * the source splits assigned by {@link SplitEnumerator}.
 *
 * @param <T> The type of the record emitted by this source reader.
 * @param <SplitT> The type of the the source splits.
 */
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {

	/**
	 * Start the reader;
	 */
	void start();

	/**
	 * Poll the next available record into the {@link SourceOutput}.
	 *
	 * <p>The implementation must make sure this method is non-blocking.
	 *
	 * <p>Although the implementation can emit multiple records into the given SourceOutput,
	 * it is recommended not doing so. Instead, emit one record into the SourceOutput
	 * and return a {@link Status#AVAILABLE_NOW} to let the caller thread
	 * know there are more records available.
	 *
	 * @return The {@link Status} of the SourceReader after the method invocation.
	 */
	Status pollNext(SourceOutput<T> sourceOutput) throws Exception;

	/**
	 * Checkpoint on the state of the source.
	 *
	 * @return the state of the source.
	 */
	List<SplitT> snapshotState();

	/**
	 * @return a future that will be completed once there is a record available to poll.
	 */
	CompletableFuture<Void> isAvailable();

	/**
	 * Adds a list of splits for this reader to read.
	 *
	 * @param splits The splits assigned by the split enumerator.
	 */
	void addSplits(List<SplitT> splits);

	/**
	 * Handle a source event sent by the {@link SplitEnumerator}
	 *
	 * @param sourceEvent the event sent by the {@link SplitEnumerator}.
	 */
	default void handleSourceEvents(SourceEvent sourceEvent) {
		// Do nothing.
	}

	/**
	 * The status of this reader.
	 */
	enum Status {
		/** The next record is available right now. */
		AVAILABLE_NOW,
		/** The next record will be available later. */
		AVAILABLE_LATER,
		/** The source reader has completed all the reading work. */
		FINISHED
	}
}

SourceReaderContext

Code Block
languagejava
titleSplitEnumerator
linenumberstrue
/**
 * A context for the source reader. It allows the source reader to get the context information and
 * allows the SourceReader to send source event to its split enumerator.
 */
public interface SourceReaderContext {

	/**
	 * Returns the metric group for this parallel subtask.
	 *
	 * @return metric group for this parallel subtask.
	 */
	MetricGroup getMetricGroup();

	/**
	 * Send a source event to the corresponding SplitEnumerator.
	 *
	 * @param event The source event to send.
	 */
	void sendEventToEnumerator(SourceEvent event);
}

SplitEnumerator

Code Block
languagejava
titleSplitEnumerator
linenumberstrue
/**
 * A interface of a split enumerator responsible for the followings:
 * 1. discover the splits for the {@link SourceReader} to read.
 * 2. assign the splits to the source reader.
 */
public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable {

	/**
	 * Start the split enumerator.
	 *
	 * <p>The default behavior does nothing.
	 */
	default void start() {
		// By default do nothing.
	}

	/**
	 * Handles the source event from the source reader.
	 *
	 * @param subtaskId the subtask id of the source reader who sent the source event.
	 * @param sourceEvent the source event from the source reader.
	 */
	void handleSourceEvent(int subtaskId, SourceEvent sourceEvent);

	/**
	 * Add a split back to the split enumerator. It will only happen when a {@link SourceReader} fails
	 * and there are splits assigned to it after the last successful checkpoint.
	 *
	 * @param splits The split to add back to the enumerator for reassignment.
	 * @param subtaskId The id of the subtask to which the returned splits belong.
	 */
	void addSplitsBack(List<SplitT> splits, int subtaskId);

...

Code Block
languagejava
titleSplitEnumerator
linenumberstrue
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {

	/**
	 * ChecksAdd whethera thenew source reader supportswith the given boundednesssubtask ID.
	 *
	 * <p>Some sources might only support either continuous unbounded streams, or @param subtaskId the subtask ID of the new source reader.
	 */
	void addReader(int subtaskId);

	/**
	 * bounded streamsCheckpoints the state of this split enumerator.
	 */
	booleanCheckpointT supportsBoundednesssnapshotState(Boundedness boundedness);

	/**
	 * Creates a new reader Called to close the enumerator, in case it holds on to readany dataresources, fromlike thethreads spitsor
	 it* getsnetwork assignedconnections.
	 * The reader starts fresh and does not have any state to resume.
	 */
	SourceReader<T, SplitT> createReader(SourceContext/
	@Override
	void close() throws IOException;
}

SplitEnumeratorContext

Code Block
languagejava
titleStreamExecutionEnvironment
linenumberstrue
/**
 * A context class for the {@link SplitEnumerator}. This class serves the following purposes:
 * 1. Host information necessary for the SplitEnumerator to make split assignment decisions.
 * 2. Accept and track the split assignment from the enumerator.
 * 3. Provide a managed threading model so the split enumerators do not need to create their
 *    own internal threads.
 *
 * @param <SplitT> the type of the splits.
 */
public interface SplitEnumeratorContext<SplitT extends SourceSplit> {

	MetricGroup metricGroup(); ctx) throws IOException;

	/**
	 * Creates a new SplitEnumerator for this source, starting a new input.
	 */
	SplitEnumerator<SplitT, EnumChkT> createEnumerator(Boundedness mode) throws IOException;

	/**
	 * Restores an enumerator from a checkpoint.
	 */
	SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(Boundedness mode, EnumChkT checkpoint) throws IOException;

	// ------------------------------------------------------------------------
	//  serializers for the metadata
	// ------------------------------------------------------------------------

	/**
	 * CreatesSend a source serializerevent forto thea inputsource splitsreader. Splits are serialized when sending them The source reader is identified by its subtask id.
	 *
	 from* enumerator@param tosubtaskId reader,the andsubtask whenid checkpointingof the source reader's current state to send this event to.
	 */
	SimpleVersionedSerializer<SplitT> getSplitSerializer();

	/**
	 * Creates @param event the serializersource forevent the {@link SplitEnumerator} checkpointto send.
	 * @return Thea serializercompletable isfuture usedwhich forwill thebe resultcompleted ofwhen the {@link SplitEnumerator#snapshotState()}
	 * methodevent is successfully sent.
	 */
	SimpleVersionedSerializer<EnumChkT>void getEnumeratorCheckpointSerializer(sendEventToSourceReader(int subtaskId, SourceEvent event);
}


	/**
 * The boundedness of the source: "bounded" for	 * Get the currentlynumber available data (batch style),
of subtasks.
	 *
	 * "continuous@return unbounded"the fornumber a continuous streaming style sourceof subtasks.
	 */
public enum Boundedness {	int numSubtasks();

	/**
	 * AGet boundedthe sourcecurrently processesregistered thereaders. dataThe thatmapping is currentlyfrom availablesubtask andid willto endthe afterreader thatinfo.
	 *
	 * <p>When a source produces a bounded stream, the runtime may activate additional optimizations *
	 * @return the currently registered readers.
	 */
	Map<Integer, ReaderInfo> registeredReaders();

	/**
	 * thatAssign are suitable only for bounded input. Incorrectly producing unbounded data when the sourcethe splits.
	 *
	 * @param newSplitAssignments the new split assignments to add.
	 * is set to produce a bounded stream will often result in programs that do not output any results/
	void assignSplits(SplitsAssignment<SplitT> newSplitAssignments);

	/**
	 * Invoke the callable and checks its return value. If the return value is true then
	 * andnotify maythe eventuallysource failcoordinator duethat toa runtimenew errorssplit (outassignment of memory or storage)is available.
	 */
	BOUNDED,

	/**
	 * A continuous unbounded source continuously processes all data as it comes.
	 * * <p>It is important to make sure that the callable and handler does not modify
	 * <p>Theany sourceshared may run forever (until the program is terminated) orstate. Otherwise the there might actuallybe end at some point,unexpected behavior.
	 *
	 * based@param oncallable some source-specific conditions. Because that is not transparent to the runtime,a callable to call.
	 * the@param runtimehandler willa usehandler anthat executionhandles modethe forreturn continuousvalue unboundedof streamsor wheneverthe thisexception mode
	thrown *from isthe chosencallable.
	 */
	CONTINUOUS_UNBOUNDED
}

Reader

(see above)

...

Code Block
languagejava
titleSplitEnumerator
linenumberstrue
public interface SplitEnumerator<SplitT, CheckpointT> extends Closeable {<T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler);

	/**
	 * Returns true whenInvoke the callable periodically and checks its return value. If the return value is
	 * true then notify the inputsource iscoordinator boundedthat anda nonew moresplit splitsassignment areis available.
	 *
	 False* means<p>It thatis theimportant definiteto endmake ofsure inputthat hasthe beencallable reached,and andhandler isdoes onlynot possiblemodify
	 * inany boundedshared sourcesstate.
	 */
	boolean isEndOfInput();

	/**
	 * Returns the next split, if it is available. If nothing is currently available, this returns Otherwise the there might be unexpected behavior.
	 *
	 * an empty Optional@param callable the callable to call.
	 * More@param mayhandler bea availablehandler later,that ifhandles the {@link #isEndOfInput()} is false.
	 */
	Optional<SplitT> nextSplit(ReaderLocation reader);

	/** return value of or the exception thrown from the callable.
	 * Adds splits back to@param initialDelay the enumerator.initial Thisdelay happensof when a reader failed and restarted,calling the callable.
	 * and@param period the splitsperiod assignedbetween totwo thatinvocations reader sinceof the last checkpoint need to be made
	 * available again.
	 */
	void addSplitsBack(List<SplitT> splits);

	/**
	 * Checkpoints the state of this split enumerator.
	 */
	CheckpointT snapshotState();

	/**
	 * Called to close the enumerator, in case it holds on to any resources, like threads or
	 * network connections.
	 */
	@Override
	void close() throws IOException;
}


public interface PeriodicSplitEnumerator<SplitT, CheckpointT> extends SplitEnumerator<SplitT, CheckpointT> {callable.
	 */
	<T> void callAsync(Callable<T> callable,
					   BiConsumer<T, Throwable> handler,
					   long initialDelay,
					   long period);
}

SplitAssignment

Code Block
languagejava
titleStreamExecutionEnvironment
linenumberstrue
/**
 * A class containing the splits assignment to the source readers.
 *
 * <p>The assignment is always incremental. In another word, splits in the assignment are simply
 * added to the existing assignment.
 */
public class SplitsAssignment<SplitT extends SourceSplit> {
	private final Map<Integer, List<SplitT>> assignment;

	public SplitsAssignment(Map<Integer, List<SplitT>> assignment) {
		this.assignment = assignment;
	}

	/**
	 * Called periodically @return A mapping from subtask ID to discovertheir furthersplit splitsassignment.
	 */
	public *Map<Integer, @returnList<SplitT>> Returns true if further splits were discovered, false if not.
	 */
	boolean discoverMoreSplits() throws IOException;

	/**
	 * Continuous enumeration is only applicable to unbounded sources.
	 */
	default boolean isEndOfInput() {
		return false;
	}assignment() {
		return assignment;
	}

	@Override
	public String toString() {
		return assignment.toString();
	}
}

SourceEvent

Code Block
languagejava
titleStreamExecutionEnvironment
linenumberstrue
/**
 * An event for the SourceReaders and Enumerators.
 */
public interface SourceEvent extends Serializable {
}

StreamExecutionEnvironment

Code Block
languagejava
titleStreamExecutionEnvironment
linenumberstrue
public class StreamExecutionEnvironment {
...
    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}
...
}

SourceOutput and Watermarking

Code Block
languagejava
titleSourceOutput and Watermarking
linenumberstrue
public interface SourceOutput<E> extends WatermarkOutput {

	void emitRecord(E record);

	void emitRecord(E record, long timestamp);
}


/**
 * An output for watermarks. The output accepts watermarks and idleness (inactivity) status.
 */
public interface WatermarkOutput {

	/**
	 * Emits the given watermark.
	 *
	 * <p>Emitting a watermark also ends previously marked idleness.
	 */
	void emitWatermark(Watermark watermark);

	/**
	 * Marks this output as idle, meaning that downstream operations do not
	 * wait for watermarks from this output.
	 *
	 * <p>An output becomes active again as soon as the next watermark is emitted.
	 */
	void markIdle();
}

Proposed Changes


Implementation Plan

The implementation should proceed in the following steps, some of which can proceed concurrently.

...

A naive implementation prototype that implements this in user space atop the existing Flink operations is given here: https://github.com/aljoscha/flink/commits/refactor-source-interface. This also comes with a complete Kafka source implementation that already supports checkpointing.

Proposed Changes

As an MVP, we propose to add the new interfaces and a runtime implementation using the existing SourceFunction for running the enumerator along with a special operator implementation for running the split reader. As a next step, we can add a dedicated StreamTask implementation for both the enumerator and reader to take advantage of the additional optimization potential. For example, more efficient handling of the checkpoint lock.

...