Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSourceReader reading methods
linenumberstrue
interface SplitReader<RecordsT> extends Closeable {

	@Nullable
	RecordsT fetchNextRecords(Duration timeout) throws IOException;

	void wakeup();
}

...

  1. Pipelined persistent channels: The contents of a channel is persistent between checkpoints. A receiving task requests the data "after checkpoint X". The data is pruned when checkpoint X+1 is completed.
    When a reader fails, the recovered reader task can reconnect to the stream after the checkpoint and will get the previously assigned splits. Batch is a special case, if there are no checkpoints, then the channel holds all data since the beginning.
    • Pro: The "pipelined persistent channel" has also applications beyond the enumerator to reader connection.
    • Con: Splits always go to the same reader and cannot be distributed across multiple readers upon recovery. Especially for batch programs, this may create bad stragglers during recovery.

  2. Reconnects and task notifications on failures:The enumerator task needs to remember the splits assigned to each result partition until the next checkpoint completes. The enumerator task would have to be notified of the failure of a downstream task and add the splits back to the enumerator. Recovered reader tasks would simply reconnect and get a new stream.
    • Pro: Re-distribution of splits across all readers upon failure/recovery (no stragglers).
    • Con: Breaks abstraction that separates task and network stack.


Option 2: Enumerator on the JobManager: Enumerator on the JobManager

Similar to the current batch (DataSet) input spit assigner, the SplitEnumerator code runs in the JobManager, as part of an ExecutionJobVertex. To support periodic split discovery, the enumerator has to be periodically called from an additional thread.

The readers request splits via an RPC message and the enumerator responds via RPC. RPC messages carry payload for information like location.

Extra care needs to me taken to align the split assignment messages with checkpoint barriers. If we start to support metadata-based watermarks (to handle event time consistently when dealing with collections of bounded splits), we need to support that as well through RPC and align it with the input split assignment.

The enumerator creates its own piece of checkpoint state when a checkpoint is triggered.

Critical parts here are the added complexity on the master (ExecutionGraph) and the checkpoints. Aligning them properly with RPC messages is possible when going through the now single threaded execution graph dispatcher thread, but to support asynchronous checkpoint writing requires more complexity.(TBD. explain more)


Open Questions

In both cases, the enumerator is a point of failure that requires a restart of the entire dataflow.
To circumvent that, we probably need an additional mechanism, like a write-ahead log for split assignment.

...

CriterionEnumerate on TaskEnumerate on JobManager

Encapsulation of Enumerator

Encapsulation in separate TaskAdditional complexity in ExecutionGraph
Network Stack ChangesSignificant changes.
Some are more clear, like reconnecting. Some seem to break
abstractions, like
notifying tasks of downstream failures.
No Changes necessary
Scheduler / Failover RegionMinor changesNo changes necessary
Checkpoint alignmentNo changes necessary
(splits are data messages,
naturally align with barriers)
Careful coordination between split assignment
and checkpoint triggering.
Might be simple if both actions are run in the
single-threaded ExecutionGraph thread.
WatermarksNo changes necessary
(splits are data messages, watermarks
naturally flow)

Watermarks would go through ExecutionGraph
and RPC.

Checkpoint StateNo additional mechanism (only regular task state)Need to add support for monasynchronous non-metadata
state on
the JobManager / ExecutionGraph
Supporting graceful
Enumerator recovery
(avoid full restarts)

Network reconnects (like above), plus write-ahead of split
assignment between checkpoints.

Tracking split assignment between checkpoints, plus
write-ahead of split assignment between checkpoints.


Personal opinion from Stephan:  If we find an elegant was to abstract the network stack changes, I would lean towards running the Enumerator in a Task, not on the JobManager.

Core Public Interfaces


Source

Code Block
languagejava
titleSplitEnumerator
linenumberstrue
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {

	/**
	 * Checks whether the source supports the given boundedness.
	 *
	 * <p>Some sources might only support either continuous unbounded streams, or
	 * bounded streams.
	 */
	boolean supportsBoundedness(Boundedness boundedness);

	/**
	 * Creates a new reader to read data from the spits it gets assigned.
	 * The reader starts fresh and does not have any state to resume.
	 */
	SourceReader<T, SplitT> createReader(SourceContext ctx) throws IOException;

	/**
	 * Creates a new SplitEnumerator for this source, starting a new input.
	 */
	SplitEnumerator<SplitT, EnumChkT> createEnumerator(Boundedness mode) throws IOException;

	/**
	 * Restores an enumerator from a checkpoint.
	 */
	SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(Boundedness mode, EnumChkT checkpoint) throws IOException;

	// ------------------------------------------------------------------------
	//  serializers for the metadata
	// ------------------------------------------------------------------------

	/**
	 * Creates a serializer for the input splits. Splits are serialized when sending them
	 * from enumerator to reader, and when checkpointing the reader's current state.
	 */
	SimpleVersionedSerializer<SplitT> getSplitSerializer();

	/**
	 * Creates the serializer for the {@link SplitEnumerator} checkpoint.
	 * The serializer is used for the result of the {@link SplitEnumerator#snapshotState()}
	 * method.
	 */
	SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}


/**
 * The boundedness of the source: "bounded" for the currently available data (batch style),
 * "continuous unbounded" for a continuous streaming style source.
 */
public enum Boundedness {

	/**
	 * A bounded source processes the data that is currently available and will end after that.
	 *
	 * <p>When a source produces a bounded stream, the runtime may activate additional optimizations
	 * that are suitable only for bounded input. Incorrectly producing unbounded data when the source
	 * is set to produce a bounded stream will often result in programs that do not output any results
	 * and may eventually fail due to runtime errors (out of memory or storage).
	 */
	BOUNDED,

	/**
	 * A continuous unbounded source continuously processes all data as it comes.
	 *
	 * <p>The source may run forever (until the program is terminated) or might actually end at some point,
	 * based on some source-specific conditions. Because that is not transparent to the runtime,
	 * the runtime will use an execution mode for continuous unbounded streams whenever this mode
	 * is chosen.
	 */
	CONTINUOUS_UNBOUNDED
}

...

Code Block
languagejava
titleSplitEnumerator
linenumberstrue
public interface SplitEnumerator<SplitT, CheckpointT> extends Closeable {

	/**
	 * Returns true when the input is bounded and no more splits are available.
	 * False means that the definite end of input has been reached, and is only possible
	 * in bounded sources.
	 */
	boolean isEndOfInput();

	/**
	 * Returns the next split, if it is available. If nothing is currently available, this returns
	 * an empty Optional.
	 * More may be available later, if the {@link #isEndOfInput()} is false.
	 */
	Optional<SplitT> nextSplit(ReaderLocation reader);

	/**
	 * Adds splits back to the enumerator. This happens when a reader failed and restarted,
	 * and the splits assigned to that reader since the last checkpoint need to be made
	 * available again.
	 */
	void addSplitsBack(List<SplitT> splits);

	/**
	 * Checkpoints the state of this split enumerator.
	 */
	CheckpointT snapshotState();

	/**
	 * Called to close the enumerator, in case it holds on to any resources, like threads or
	 * network connections.
	 */
	@Override
	void close() throws IOException;
}


public interface PeriodicSplitEnumerator<SplitT, CheckpointT> extends SplitEnumerator<SplitT, CheckpointT> {

	/**
	 * Called periodically to discover further splits.
	 *
	 * @return Returns true if further splits were discovered, false if not.
	 */
	boolean discoverMoreSplits() throws IOException;

	/**
	 * Continuous enumeration is only applicable to unbounded sources.
	 */
	default boolean isEndOfInput() {
		return false;
	}
}

...

Code Block
languagejava
titleStreamExecutionEnvironment
linenumberstrue
public class StreamExecutionEnvironment {
...
    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> continuousSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source) {...}

    public <T> DataStream<T> boundedSource(Source<T, ?, ?> source, TypeInformation<T> type) {...}
...
}

...