Discussion threadhttps://lists.apache.org/thread/7gw2s6c1mntl4x6s0np3t0gjcvl6bnn5
Vote threadhttps://lists.apache.org/thread/of03bqzf1jd5jjzh0qkxk31d2sjd7sgd
JIRA

Unable to render Jira issues macro, execution error.

Release1.11

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, we have two different flavours of Watermark Assigners: AssignerWithPunctuatedWatermarks and AssignerWithPeriodicWatermarks. Both of them extend from TimestampAssigner. This means that sources that want to support watermark assignment/extraction in the source need to support two separate interfaces, we have two operator implementations for the different flavours. Also, this makes features such as generic support for idleness detection more complicated to implemented because we again have to support two types of watermark assigners.

In this FLIP we propose two things:

  1. Unify the Watermark Assigners into one Interface WatermarkGenerator
  2. Separate this new interface from the TimestampAssigner

The motivation for the first is to simplify future implementations and code duplication. The motivation for the second point is again code deduplication, most assigners currently have to extend from some base timestamp extractor or duplicate the extraction logic, or users have to override an abstract method of the watermark assigner to provide the timestamp extraction logic.

Additionally, we propose to add a generic wrapping WatermarkGenerator that provides idleness detection, i.e. it can mark a stream/partition as idle if no data arrives after a configured timeout.

The "unify and separate" part refers to the fact that we want to unify punctuated and periodic assigners but at the same time split the timestamp assigner from the watermark generator.

Finally, we want to add these new interfaces to flink-core to the existing org.apache.flink.api.common.eventtime package.

Public Interfaces

We are proposing the new TimestampAssigner and WatermarkGenerator interfaces as the basic new interfaces of this abstraction. On top of this, we're proposing WatermarkStrategy, which combines an assigner and a generator and is used in APIs and the plumbing to carry both of them together. We're also proposing the convenience class WatermarkStrategies for creating common strategies with default timestamp assigners.

TimestampAssigner
/**
 * A {@code TimestampAssigner} assigns event time timestamps to elements.
 * These timestamps are used by all functions that operate on event time,
 * for example event time windows.
 *
 * <p>Timestamps can be an arbitrary {@code long} value, but all built-in implementations
 * represent it as the milliseconds since the Epoch (midnight, January 1, 1970 UTC),
 * the same way as {@link System#currentTimeMillis()} does it.
 *
 * @param <T> The type of the elements to which this assigner assigns timestamps.
 */
@PublicEvolving
@FunctionalInterface
public interface TimestampAssigner<T> extends Function {

	/**
	 * Assigns a timestamp to an element, in milliseconds since the Epoch.
	 *
	 * <p>The method is passed the previously assigned timestamp of the element.
	 * That previous timestamp may have been assigned from a previous assigner. If the element did
	 * not carry a timestamp before, this value is {@code Long.MIN_VALUE}.
	 *
	 * @param element The element that the timestamp will be assigned to.
	 * @param currentTimestamp The current internal timestamp of the element,
	 *                         or a negative value, if no timestamp has been assigned yet.
	 * @return The new timestamp.
	 */
	long extractTimestamp(T element, long recordTimestamp);
}


WatermarkGenerator
/**
 * The {@code WatermarkGenerator} generates watermarks either based on events or
 * periodically (in a fixed interval).
 *
 * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
 * {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
 */
@PublicEvolving
public interface WatermarkGenerator<T> {

	/**
	 * Called for every event, allows the watermark generator to examine and remember the
	 * event timestamps, or to emit a watermark based on the event itself.
	 */
	void onEvent(T event, long eventTimestamp, WatermarkOutput output);

	/**
	 * Called periodically, and might emit a new watermark, or not.
	 *
	 * <p>The interval in which this method is called and Watermarks are generated
	 * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
	 */
	void onPeriodicEmit(WatermarkOutput output);
}
WatermarkStrategy
/**
 * The WatermarkStrategy defines how to generate {@link Watermark}s in the stream sources.
 * The WatermarkStrategy is a builder/factory for the {@link WatermarkGenerator} that
 * generates the watermarks and the {@link TimestampAssigner} which assigns the internal timestamp
 * of a record.
 *
 * <p>This interface is {@link Serializable} because watermark strategies may be shipped
 * to workers during distributed execution.
 */
@PublicEvolving
public interface WatermarkStrategy<T> extends Serializable {

	/**
	 * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
	 * strategy.
	 */
	default TimestampAssigner<T> createTimestampAssigner() {
		// By default, this is {@link RecordTimestampAssigner},
		// which takes the timestamp that a record already has, for example from Kafka
		return new RecordTimestampAssigner<>();
	}

	/**
	 * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
	 */
	WatermarkGenerator<T> createWatermarkGenerator();
}


Proposed Changes

We propose to add the new interfaces, along with wrapping code for the old interfaces to ensure compatibility. We need a new operator implementation for watermark extraction as well as adapt the KafkaConsumer to use the new WatermarkGenerator.

We will also need to add API methods to DataStream to allow users to specify the new interfaces for watermark generation.

The old extraction operators can be removed, we will only need the new extraction operator along with the wrapper WatermarkGenerator.

Compatibility, Deprecation, and Migration Plan

Existing user code will still work because we provide wrappers under the hood. The existing API methods will stay but we deprecate them and recommend using the new interfaces. The old interfaces should be deprecated in Flink 2.0.

Test Plan

Unit tests will be added for the new WatermarkGenerator implementations as well as generators. All existing ITCases and end-to-end tests will also test the new extraction logic because we want to remove the old extraction code in favour of the wrapping WatermarkGenerators.

Rejected Alternatives

We currently don't see alternatives, unifying the interfaces seems straightforward and will simplify implementation.