Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

Current state: Accepted


Discussion thread

...

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-10740

...

Release1.12


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The SplitEnumerator is similar to the old batch source interface's functionality of creating splits and assigning splits. It runs only once, not in parallel (but could be thought of to parallelize in the future, if necessary).
It might run on the JobManager or in a single task on a TaskManager (see below "Where to run the Enumerator").

Example:

    • In the File Source , the SplitEnumerator lists all files (possibly sub-dividing them into blocks/ranges). 
    • For the Kafka Source, the SplitEnumerator finds all Kafka Partitions that the source should read from.

...

  • Splits are both the type of work assignment and the type of state held by the source. Assigning a split or restoring a split from a checkpoint is the same to the reader.
  • Advancing the reader is a non-blocking call that returns a future.
  • We build higher-level primitives on top of the main interface (see below "High-level Readers")
  • We hide event-time / watermarks in the SourceOutput and pass different source contexts for batch (no watermarks) and streaming (with watermarks).
    The SourceOutput also abstract the per-partition watermark tracking.

The 95653748 SourceReader will run as a PushingAsyncDataInput which works well with the new mailbox threading model in the tasks, similar to the network inputs.

...

The RecordsWithSplitIds returned by the SplitReader will be passed to an 95653748 RecordEmitter one by one. The RecordEmitter is responsible for the following:

...

The interfaces used by the base implementation is covered in the section of interface for base implementation.

Failover

The state of the SplitEnumerator includes the following:

...

There was a long discussion about where to run the enumerator which we documented in the appendix. The final approach we took was very similar to option 3 with a few differences. The approach is following.

...

The event time alignment becomes easier to implement with the generic communication mechanism introduced between SplitEnumerator and SourceReader. In this FLIP we do not include this in the base implementation to reduce the complexity.

...

  • The top level public interfaces.
  • The interfaces introduced as a part of the base implementation of the top level public interfaces.
    • The base implementation provides common functionalities required for most Source implementations. See base implementation for details.
  • The RPC gateway interface change for the generic message passing mechanism.

It is worth noting that while we will try best to maintain stable interfaces, the interfaces introduced as part of the base implementation (e.g. SplitReader) is more likely to change than the top level public interface such as SplitEnumerator / SourceReader. This is primarily because we expect to add more functionality into the base implementation over time.

...

Code Block
languagejava
titleWatermarkOutputWatermark
linenumberstrue
/**
 * Watermarks are the progress indicators in the data streams. A watermark signifies
 * that no events with a timestamp smaller or equal to the watermark's time will occur after the
 * water. A watermark with timestamp <i>T</i> indicates that the stream's event time has progressed
 * to time <i>T</i>.
 *
 * <p>Watermarks are created at the sources and propagate through the streams and operators.
 *
 * <p>In some cases a watermark is only a heuristic, meaning some events with a lower timestamp
 * may still follow. In that case, it is up to the logic of the operators to decide what to do
 * with the "late events". Operators can for example ignore these late events, route them to a
 * different stream, or send update to their previously emitted results.
 *
 * <p>When a source reaches the end of the input, it emits a final watermark with timestamp
 * {@code Long.MAX_VALUE}, indicating the "end of time".
 *
 * <p>Note: A stream's time starts with a watermark of {@code Long.MIN_VALUE}. That means that all records
 * in the stream with a timestamp of {@code Long.MIN_VALUE} are immediately late.
 */
@Public
public final class Watermark implements Serializable {

	private static final long serialVersionUID = 1L;

	/** Thread local formatter for stringifying the timestamps. */
	private static final ThreadLocal<SimpleDateFormat> TS_FORMATTER = ThreadLocal.withInitial(
		() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"));

	// ------------------------------------------------------------------------

	/** The watermark that signifies end-of-event-time. */
	public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);

	// ------------------------------------------------------------------------

	/** The timestamp of the watermark in milliseconds. */
	private final long timestamp;

	/**
	 * Creates a new watermark with the given timestamp in milliseconds.
	 */
	public Watermark(long timestamp) {
		this.timestamp = timestamp;
	}

	/**
	 * Returns the timestamp associated with this Watermark.
	 */
	public long getTimestamp() {
		return timestamp;
	}

	/**
	 * Formats the timestamp of this watermark, assuming it is a millisecond timestamp.
	 * The returned format is "yyyy-MM-dd HH:mm:ss.SSS".
	 */
	public String getFormattedTimestamp() {
		return TS_FORMATTER.get().format(new Date(timestamp));
	}

	// ------------------------------------------------------------------------

	@Override
	public boolean equals(Object o) {
		return this == o ||
				o != null &&
				o.getClass() == Watermark.class &&
				((Watermark) o).timestamp == this.timestamp;
	}

	@Override
	public int hashCode() {
		return Long.hashCode(timestamp);
	}

	@Override
	public String toString() {
		return "Watermark @ " + timestamp + " (" + getFormattedTimestamp() + ')';
	}
}

...