Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Each source should be able to work as a bounded (batch) and as an unbounded (continuous streaming) source.

The actual decision whether it becomes bounded or unbounded is made in the DataStream API when creating the source stream.The Boundedness is a property that is passed to source when creating the SplitEnumerator. The readers should be agnostic to this distinction, they will simply read the splits assigned to them. Whether a source is bounded or unbounded is passed to the SplitEnumerator upon creation.Boundedness is an intrinsic property to the source instance itself. In most cases, only the SplitEnumerators should know the Boundedness, while the SplitReaders are agnostic.

That way, we can also make the API type safe in the future, when we can explicitly model bounded streams.

Code Block
languagejava
titleDataStream API
linenumberstrue
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

FileSource<MyType> theSource = new ParquetFileSource("fs:///path/to/dir", AvroParquet.forSpecific(MyType.class));


DataStream<MyType> stream = env.continuousSource(theSource);

DataStream<MyType> boundedStream// The returned stream will be a DataStream if theSource is unbounded.
// After we add BoundedDataStream which extends DataStream, the returned stream will be a BoundedDataStream.
// This allows users to write programs working in both batch and stream execution mode.
DataStream<MyType> stream = env.boundedSourcesource(theSource);

// this would be an option once we add bounded streams to the DataStream API Once we add bounded streams to the DataStream API, we will also add the following API.
// The parameter has to be bounded otherwise an exception will be thrown.
BoundedDataStream<MyType> batch = env.boundedSource(theSource);

...

Code Block
languagejava
titleSource
linenumberstrue
/**
 * The interface for Source. It acts like a factory class that helps construct
 * the {@link SplitEnumerator} and {@link SourceReader} and corresponding
 * serializers.
 *
 * @param <T>        The type of records produced by the source.
 * @param <SplitT>   The type of splits handled by the source.
 * @param <EnumChkT> The type of the enumerator checkpoints.
 */
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {

	/**
	 * Checks whether the source supports the givenenumerator boundednesscheckpoints.
	 */
	public *interface <p>SomeSource<T, sourcesSplitT mightextends onlySourceSplit, supportEnumChkT> eitherextends continuous unbounded streams, orSerializable {

	 * bounded streams./**
	 *
	 *Get @paramthe boundedness Theof boundednessthis to checksource.
	 * @return <code>true</code> if
	 * @return the given boundedness isof supported, <code>false</code> otherwisethis source.
	 */
	booleanBoundedness supportsBoundednessgetBoundedness(Boundedness boundedness);

	/**
	 * Creates a new reader to read data from the spits it gets assigned.
	 * The reader starts fresh and does not have any state to resume.
	 *
	 * @param readerContext The {@link SourceReaderContext context} for the source reader.
	 * @return A new SourceReader.
	 */
	SourceReader<T, SplitT> createReader(SourceReaderContext readerContext);

	/**
	 * Creates a new SplitEnumerator for this source, starting a new input.
	 *
     * @param boundedness The boundedness of this source.
	 * @param enumContext The {@link SplitEnumeratorContext context} for the split enumerator.
	 * @return A new SplitEnumerator.
	 */
	SplitEnumerator<SplitT, EnumChkT> createEnumerator(Boundedness boundedness, SplitEnumeratorContext<SplitT> enumContext);

	/**
	 * Restores an enumerator from a checkpoint.
	 *
	 * @param enumContext The {@link SplitEnumeratorContext context} for the restored split enumerator.
	 * @param checkpoint The checkpoint to restore the SplitEnumerator from.
	 * @return A SplitEnumerator restored from the given checkpoint.
	 */
	SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
			SplitEnumeratorContext<SplitT> enumContext,
			EnumChkT checkpoint) throws IOException;

	// ------------------------------------------------------------------------
	//  serializers for the metadata
	// ------------------------------------------------------------------------

	/**
	 * Creates a serializer for the source splits. Splits are serialized when sending them
	 * from enumerator to reader, and when checkpointing the reader's current state.
	 *
	 * @return The serializer for the split type.
	 */
	SimpleVersionedSerializer<SplitT> getSplitSerializer();

	/**
	 * Creates the serializer for the {@link SplitEnumerator} checkpoint.
	 * The serializer is used for the result of the {@link SplitEnumerator#snapshotState()}
	 * method.
	 *
	 * @return The serializer for the SplitEnumerator checkpoint.
	 */
	SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}


/**
 * The boundedness of the source: "bounded" for the currently available data (batch style),
 * "continuous unbounded" for a continuous streaming style source.
 */
public enum Boundedness {

	/**
	 * A bounded source processes the data that is currently available and will end after that.
	 *
	 * <p>When a source produces a bounded stream, the runtime may activate additional optimizations
	 * that are suitable only for bounded input. Incorrectly producing unbounded data when the source
	 * is set to produce a bounded stream will often result in programs that do not output any results
	 * and may eventually fail due to runtime errors (out of memory or storage).
	 */
	BOUNDED,

	/**
	 * A continuous unbounded source continuously processes all data as it comes.
	 *
	 * <p>The source may run forever (until the program is terminated) or might actually end at some point,
	 * based on some source-specific conditions. Because that is not transparent to the runtime,
	 * the runtime will use an execution mode for continuous unbounded streams whenever this mode
	 * is chosen.
	 */
	CONTINUOUS_UNBOUNDED
}

...