Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Splits are both the type of work assignment and the type of state held by the source. Assigning a split or restoring a split from a checkpoint is the same to the reader.
  • Advancing the reader is a non-blocking call that returns a future.
  • We build higher-level primitives on top of the main interface (see below "High-level Readers")
  • We hide event-time / watermarks in the SourceOutput and pass different source contexts for batch (no watermarks) and streaming (with watermarks).
    The SourceOutput also abstract the per-partition watermark tracking.

The following reader interface is copied from the SourceReader in the public interface section.

Code Block
languagejava
titleSourceReader
linenumberstrue
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {

	void start();

	Status pollNext(SourceOutput<T> sourceOutput) throws Exception;

	List<SplitT> snapshotState();

	CompletableFuture<Void> isAvailable();

	void addSplits(List<SplitT> splits);

	default void handleSourceEvents(SourceEvent sourceEvent) {
		// Do nothing.
	}

	enum Status {
		/** The next record is available right now. */
		AVAILABLE_NOW,
		/** The next record will be available later. */
		AVAILABLE_LATER,
		/** The source reader has completed all the reading work. */
		FINISHED
	}
}

The implementation assumes that there is a single thread that drives the source. This single thread calls pollNext(...) when data is available and cedes execution when nothing is available. It also handles checkpoint triggering, timer callbacks, etc. thus making the task lock free.

The thread is expected to work on some form of mailbox, in which the data emitting loop is one possible task (next to timers, checkpoints, ...). Below is a very simple pseudo code for the driver loop.

This type of hot loop makes sure we do not use the future unless the reader is temporarily out of data, and we bypass the opportunistically bypass the mailbox for performance (mailbox will need some amount of synchronization).

Code Block
languagejava
titlereader loop pseudo code
linenumberstrue
final BlockingQueue<Runnable> mailbox = new LinkedBlockingQueue<>();

final SourceReader<T> reader = ...;

final Runnable readerLoop = new Runnable() {

    public void run() {
        while (true) {
            ReaderStatus status = reader.emitNext(output);
            if (status == MORE_AVAILABLE) {
                if (mailbox.isEmpty()) {
                    continue;
                }
                else {
                    addReaderLoopToMailbox();
                    break;
                }
            }
            else if (status == NOTHING_AVAILABLE) {
                reader.available().thenAccept((ignored) -> addReaderLoopToMailbox());
                break;
            }
            else if (status == END_OF_SPLIT_DATA) {
                break;
            }
        }
    };

    void addReaderLoopToMailbox() {
        mailbox.add(readerLoop);
    }

    void triggerCheckpoint(CheckpointOptions options) {
        mailbox.add(new CheckpointAction(options));
    }

    void taskMainLoop() {
        while (taskAlive) {
            mailbox.take().run();
        }
    }
}

Base implementation and high-level readers

The core source interface (the lowest level interface) is very generic. That makes it flexible, but hard to implement for contributors, especially for sufficiently complex reader patterns like in Kafka or Kinesis.
In general, most I/O libraries used for connectors are not asynchronous, and would need to spawn an I/O thread to make them non-blocking for the main thread.

We propose to solve this by building higher level source abstractions that offer simpler interfaces that allow for blocking calls.
These higher level abstractions would also solve the issue of sources that handle multiple splits concurrently, and the per-split event time logic.

Most readers fall into one of the following categories:

  1. One reader single splits. (Some dead simple blocking readers)
  2. One reader multiple splits.
    1. Sequential Single Split (File, database query, most bounded splits)
    2. Multi-split multiplexed (Kafka, Pulsar, Pravega, ...)
    3. Multi-split multi-threaded (Kinesis, ...)

...

Image Removed

Sequential Single Split

...

Image Removed

Multi-split Multiplexed

...

Image Removed

Multi-split Multi-threaded

...

languagejava
titleSourceReader reading methods
linenumberstrue

...

The SourceReader will run as a PushingAsyncDataInput which works well with the new mailbox threading model in the tasks, similar to the network inputs.

Anchor
BaseImplementation
BaseImplementation
Base implementation and high-level readers

The core source interface (the lowest level interface) is very generic. That makes it flexible, but hard to implement for contributors, especially for sufficiently complex reader patterns like in Kafka or Kinesis.
In general, most I/O libraries used for connectors are not asynchronous, and would need to spawn an I/O thread to make them non-blocking for the main thread.

We propose to solve this by building higher level source abstractions that offer simpler interfaces that allow for blocking calls.
These higher level abstractions would also solve the issue of sources that handle multiple splits concurrently, and the per-split event time logic.

Most readers fall into one of the following categories:

  1. One reader single splits. (Some dead simple blocking readers)
  2. One reader multiple splits.
    1. Sequential Single Split (File, database query, most bounded splits)
    2. Multi-split multiplexed (Kafka, Pulsar, Pravega, ...)
    3. Multi-split multi-threaded (Kinesis, ...)


Image Added

Sequential Single Split

Image Added

Multi-split Multiplexed

Image Added

Multi-split Multi-threaded


Most of the readers implemented against these higher level building blocks would only need to implement an interface similar to this. The contract would also be that all methods except wakeup() would be called by the same thread, obviating the need for any concurrency handling in the connector.

Code Block
languagejava
titleSourceReader reading methods
linenumberstrue
public interface SplitReader<E, SplitT extends SourceSplit> {

	RecordsWithSplitIds<E> fetch() throws InterruptedException;

	void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);

	void wakeUp();
}

Failover

The state of the SplitEnumerator includes the following:

  • The unassigned splits
  • The splits that have been assigned but not successfully checkpointed yet.
    • The assigned but uncheckpointed splits will be associated with each of the checkpoint id they belong to.

The state of the SourceReader includes:

  • The assigned splits
  • The state of the splits (e.g. Kafka offsets, HDFS file offset, etc)

When the SplitEnumerator fails, a full failover will be performed. While it is possible to have a finer grained failover to only restore the state of the SplitEnumerator, we would like to address this in a separate FLIP.

When a SourceReader fails, the failed SourceReader will be restore to its last successful checkpoint. The SplitEnumerator will partially reset its state by adding the assigned-but-uncheckpointed splits back to the SplitEnumerator. In this case, only the failed subtask and its connected nodes will have to reset the states.

Where to run the enumerator

...

  • The top level public interfaces.
  • The interfaces introduced as a part of the base implementation of the top level public interfaces.
    • The base implementation provides common functionalities required for most Source implementations. See base implementation for details.
  • The RPC gateway interface change for the generic message passing mechanism.

It is worth noting that while we will try best to maintain stable interfaces, the interfaces introduced as part of the base implementation (e.g. SplitReader) is more likely to change than the top level public interface such as SplitEnumerator / SourceReader. This is primarily because we expect to add more functionality into the base implementation over time.

Top level public interfaces

...

Code Block
languagejava
titleSource
linenumberstrue
/**
 * The interface for Source. It acts like a factory class that helps construct
 * the {@link SplitEnumerator} and {@link SourceReader} and corresponding
 * serializers.
 *
 * @param <T>        The type of records produced by the source.
 * @param <SplitT>   The type of splits handled by the source.
 * @param <EnumChkT> The type of the enumerator checkpoints.
 */
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {

	/**
	 * Checks whether the source supports the given boundedness.
	 *
	 * <p>Some sources might only support either continuous unbounded streams, or
	 * bounded streams.
	 *
	 * @param boundedness The boundedness to check.
	 * @return <code>true</code> if the given boundedness is supported, <code>false</code> otherwise.
	 */
	boolean supportsBoundedness(Boundedness boundedness);

	/**
	 * Creates a new reader to read data from the spits it gets assigned.
	 * The reader starts fresh and does not have any state to resume.
	 *
	 * @param config A flat config for this source operator.
	 * @param readerContext The {@link SourceReaderContext context} for the source reader.
	 * @return A new SourceReader.
	 */
	SourceReader<T, SplitT> createReader(
			Configuration config,
			 createReader(SourceReaderContext readerContext);

	/**
	 * Creates a new SplitEnumerator for this source, starting a new input.
	 *
	 * @param config The configuration for this operator.
	 * @param enumContext The {@link SplitEnumeratorContext context} for the split enumerator.
	 * @return A new SplitEnumerator.
	 */
	SplitEnumerator<SplitT, EnumChkT> createEnumerator(
			Configuration config,
			SplitEnumeratorContext<SplitT> enumContext);

	/**
	 * Restores an enumerator from a checkpoint.
	 *
	 * @param config The configuration of this operator.
	 * @param enumContext The {@link SplitEnumeratorContext context} for the restored split enumerator.
	 * @param checkpoint The checkpoint to restore the SplitEnumerator from.
	 * @return A SplitEnumerator restored from the given checkpoint.
	 */
	SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
			Configuration config,
			SplitEnumeratorContext<SplitT> enumContext,
			EnumChkT checkpoint) throws IOException;

	// ------------------------------------------------------------------------
	//  serializers for the metadata
	// ------------------------------------------------------------------------

	/**
	 * Creates a serializer for the source splits. Splits are serialized when sending them
	 * from enumerator to reader, and when checkpointing the reader's current state.
	 *
	 * @return The serializer for the split type.
	 */
	SimpleVersionedSerializer<SplitT> getSplitSerializer();

	/**
	 * Creates the serializer for the {@link SplitEnumerator} checkpoint.
	 * The serializer is used for the result of the {@link SplitEnumerator#snapshotState()}
	 * method.
	 *
	 * @return The serializer for the SplitEnumerator checkpoint.
	 */
	SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}


/**
 * The boundedness of the source: "bounded" for the currently available data (batch style),
 * "continuous unbounded" for a continuous streaming style source.
 */
public enum Boundedness {

	/**
	 * A bounded source processes the data that is currently available and will end after that.
	 *
	 * <p>When a source produces a bounded stream, the runtime may activate additional optimizations
	 * that are suitable only for bounded input. Incorrectly producing unbounded data when the source
	 * is set to produce a bounded stream will often result in programs that do not output any results
	 * and may eventually fail due to runtime errors (out of memory or storage).
	 */
	BOUNDED,

	/**
	 * A continuous unbounded source continuously processes all data as it comes.
	 *
	 * <p>The source may run forever (until the program is terminated) or might actually end at some point,
	 * based on some source-specific conditions. Because that is not transparent to the runtime,
	 * the runtime will use an execution mode for continuous unbounded streams whenever this mode
	 * is chosen.
	 */
	CONTINUOUS_UNBOUNDED
}

...

Code Block
languagejava
titleSourceReader
linenumberstrue
/**
 * The interface for a source reader which is responsible for reading the records from
 * the source splits assigned by {@link SplitEnumerator}.
 *
 * @param <T> The type of the record emitted by this source reader.
 * @param <SplitT> The type of the the source splits.
 */
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {

	/**
	 * Start the reader;
	 */
	void start();

	/**
	 * Poll the next available record into the {@link SourceOutput}.
	 *
	 * <p>The implementation must make sure this method is non-blocking.
	 *
	 * <p>Although the implementation can emit multiple records into the given SourceOutput,
	 * it is recommended not doing so. Instead, emit one record into the SourceOutput
	 * and return a {@link Status#AVAILABLE_NOW} to let the caller thread
	 * know there are more records available.
	 *
	 * @return The {@link Status} of the SourceReader after the method invocation.
	 */
	Status pollNext(SourceOutput<T> sourceOutput) throws Exception;

	/**
	 * Checkpoint on the state of the source.
	 *
	 * @return the state of the source.
	 */
	List<SplitT> snapshotState();

	/**
	 * @return a future that will be completed once there is a record available to poll.
	 */
	CompletableFuture<Void> isAvailable();

	/**
	 * Adds a list of splits for this reader to read.
	 *
	 * @param splits The splits assigned by the split enumerator.
	 */
	void addSplits(List<SplitT> splits);

	/**
	 * Handle a source event sent by the {@link SplitEnumerator}
	 *
	 * @param sourceEvent the event sent by the {@link SplitEnumerator}.
	 */
	default void handleSourceEvents(SourceEvent sourceEvent) {
		// Do nothing.
	}
;

	/**
	 * The status of this reader.
	 */
	enum Status {
		/** The next record is available right now. */
		AVAILABLE_NOW,
		/** The next record will be available later. */
		AVAILABLE_LATER,
		/** The source reader has completed all the reading work. */
		FINISHED
	}
}

...

Code Block
languagejava
titleSplitEnumerator
linenumberstrue
/**
 * A interface of a split enumerator responsible for the followings:
 * 1. discover the splits for the {@link SourceReader} to read.
 * 2. assign the splits to the source reader.
 */
public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable {

	/**
	 * Start the split enumerator.
	 *
	 * <p>The default behavior does nothing.
	 */
	default void start() {
		// By default do nothing.
	}
;

	/**
	 * Handles the source event from the source reader.
	 *
	 * @param subtaskId the subtask id of the source reader who sent the source event.
	 * @param sourceEvent the source event from the source reader.
	 */
	void handleSourceEvent(int subtaskId, SourceEvent sourceEvent);

	/**
	 * Add a split back to the split enumerator. It will only happen when a {@link SourceReader} fails
	 * and there are splits assigned to it after the last successful checkpoint.
	 *
	 * @param splits The split to add back to the enumerator for reassignment.
	 * @param subtaskId The id of the subtask to which the returned splits belong.
	 */
	void addSplitsBack(List<SplitT> splits, int subtaskId);

	/**
	 * Add a new source reader with the given subtask ID.
	 *
	 * @param subtaskId the subtask ID of the new source reader.
	 */
	void addReader(int subtaskId);

	/**
	 * Checkpoints the state of this split enumerator.
	 */
	CheckpointT snapshotState();

	/**
	 * Called to close the enumerator, in case it holds on to any resources, like threads or
	 * network connections.
	 */
	@Override
	void close() throws IOException;
}

...

Code Block
languagejava
titleSourceEvent
linenumberstrue
/**
 * An eventinterface for the events passed between the SourceReaders and Enumerators.
 */
public interface SourceEvent extends Serializable {}

...

Code Block
languagejava
titleRecordWithSplitIds
linenumberstrue
/**
 * An interface for the elements passed from the fetchersSplitReader to the source reader.
 */
public interface RecordsWithSplitIds<E> {

	/**
	 * Get all the split ids.
	 *
	 * @return a collection of split ids.
	 */
	Collection<String> getSplitIds();

	/**
	 * Get all the records by Splits;
	 *
	 * @return a mapping from split ids to the records.
	 */
	Map<String, Collection<E>> getRecordsBySplits();

	/**
	 * Get the finished splits.
	 *
	 * @return the finished splits after this RecordsWithSplitIds is returned.
	 */
	Set<String> getFinishedSplits();
}

...