Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Finally, we want to add these new interfaces to flink-core to the existing org.apache.flink.api.common.eventtime package.

Public Interfaces

We are proposing the new TimestampAssigner and WatermarkGenerator interfaces as the basic new interfaces of this abstraction. On top of this, we're proposing WatermarkStrategy, which combines an assigner and a generator and is used in APIs and the plumbing to carry both of them together. We're also proposing the convenience class WatermarkStrategies for creating common strategies with default timestamp assigners.

Code Block
languagejava
titleTimestampAssigner
/**
 * A {@code TimestampAssigner} assigns event time timestamps to elements.
 * These timestamps are used by all functions that operate on event time,
 * for example event time windows.
 *
 * <p>Timestamps can be an arbitrary {@code long} value, but all built-in implementations
 * represent it as the milliseconds since the Epoch (midnight, January 1, 1970 UTC),
 * the same way as {@link System#currentTimeMillis()} does it.
 *
 * @param <T> The type of the elements to which this assigner assigns timestamps.
 */
@PublicEvolving
@FunctionalInterface
public interface TimestampAssigner<T> extends Function {

	/**
	 * Assigns a timestamp to an element, in milliseconds since the Epoch.
	 *
	 * <p>The method is passed the previously assigned timestamp of the element.
	 * That previous timestamp may have been assigned from a previous assigner. If the element did
	 * not carry a timestamp before, this value is {@code Long.MIN_VALUE}.
	 *
	 * @param element The element that the timestamp will be assigned to.
	 * @param currentTimestamp The current internal timestamp of the element,
	 *                         or a negative value, if no timestamp has been assigned yet.
	 * @return The new timestamp.
	 */
	long extractTimestamp(T element, long currentTimestamp);
}

...

Code Block
languagejava
titleWatermarkGenerator
/**
 * The {@code WatermarkGenerator} generates watermarks either based on events or
 * periodically (in a fixed interval).
 *
 * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
 * {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
 */
@PublicEvolving
public interface WatermarkGenerator<T> {

	/**
	 * Called for every event, allows the watermark generator to examine and remember the
	 * event timestamps, or to emit a watermark based on the event itself.
	 */
	void onEvent(T event, long eventTimestamp, WatermarkOutput output);

	/**
	 * Called periodically, and might emit a new watermark, or not.
	 *
	 * <p>The interval in which this method is called and Watermarks are generated
	 * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
	 */
	void onPeriodicEmit(WatermarkOutput output);
}
Code Block
languagejava
titleWatermarkStrategy
/**
 * The WatermarkStrategy defines how to generate {@link Watermark}s in the stream sources.
 * The WatermarkStrategy is a builder/factory for the {@link WatermarkGenerator} that
 * generates the watermarks and the {@link TimestampAssigner} which assigns the internal timestamp
 * of a record.
 *
 * <p>This interface is {@link Serializable} because watermark strategies may be shipped
 * to workers during distributed execution.
 */
@PublicEvolving
public interface WatermarkStrategy<T> extends Serializable {

	/**
	 * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
	 * strategy.
	 */
	default TimestampAssigner<T> createTimestampAssigner() {
		// By default, this is {@link IdentityTimestampAssigner},
		// for cases where records come out of a source with valid timestamps, for example from Kafka.
		return new IdentityTimestampAssigner<>();
	}

	/**
	 * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
	 */
	WatermarkGenerator<T> createWatermarkGenerator();
}


Proposed Changes

We propose to add the new interfaces, along with wrapping code for the old interfaces to ensure compatibility. We need a new operator implementation for watermark extraction as well as adapt the KafkaConsumer to use the new WatermarkGenerator.

...