Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Status

Current state"Under Discussion"

...

Page properties


Discussion thread

...

...

...

...

thread/of03bqzf1jd5jjzh0qkxk31d2sjd7sgd
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-17653

Release1.11


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Additionally, we propose to add a generic wrapping WatermarkGenerator that provides idleness detection, i.e. it can mark a stream/partition as idle if no data arrives after a configured timeout.

The "unify and separate" part refers to the fact that we want to unify punctuated and periodic assigners but at the same time split the timestamp assigner from the watermark generator.

Finally, we want to add these new interfaces to flink-core to the existing org.apache.flink.api.common.eventtime package.

...

Code Block
languagejava
titleTimestampAssigner
/**
 * A {@code TimestampAssigner} assigns event time timestamps to elements.
 * These timestamps are used by all functions that operate on event time,
 * for example event time windows.
 *
 * <p>Timestamps can be an arbitrary {@code long} value, but all built-in implementations
 * represent it as the milliseconds since the Epoch (midnight, January 1, 1970 UTC),
 * the same way as {@link System#currentTimeMillis()} does it.
 *
 * @param <T> The type of the elements to which this assigner assigns timestamps.
 */
@PublicEvolving
@FunctionalInterface
public interface TimestampAssigner<T> extends Function {

	/**
	 * Assigns a timestamp to an element, in milliseconds since the Epoch.
	 *
	 * <p>The method is passed the previously assigned timestamp of the element.
	 * That previous timestamp may have been assigned from a previous assigner. If the element did
	 * not carry a timestamp before, this value is {@code Long.MIN_VALUE}.
	 *
	 * @param element The element that the timestamp will be assigned to.
	 * @param currentTimestamp The current internal timestamp of the element,
	 *                         or a negative value, if no timestamp has been assigned yet.
	 * @return The new timestamp.
	 */
	long extractTimestamp(T element, long currentTimestamprecordTimestamp);
}



Code Block
languagejava
titleWatermarkGenerator
/**
 * The {@code WatermarkGenerator} generates watermarks either based on events or
 * periodically (in a fixed interval).
 *
 * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
 * {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
 */
@PublicEvolving
public interface WatermarkGenerator<T> {

	/**
	 * Called for every event, allows the watermark generator to examine and remember the
	 * event timestamps, or to emit a watermark based on the event itself.
	 */
	void onEvent(T event, long eventTimestamp, WatermarkOutput output);

	/**
	 * Called periodically, and might emit a new watermark, or not.
	 *
	 * <p>The interval in which this method is called and Watermarks are generated
	 * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
	 */
	void onPeriodicEmit(WatermarkOutput output);
}

...

Code Block
languagejava
titleWatermarkStrategy
/**
 * The WatermarkStrategy defines how to generate {@link Watermark}s in the stream sources.
 * The WatermarkStrategy is a builder/factory for the {@link WatermarkGenerator} that
 * generates the watermarks and the {@link TimestampAssigner} which assigns the internal timestamp
 * of a record.
 *
 * <p>This interface is {@link Serializable} because watermark strategies may be shipped
 * to workers during distributed execution.
 */
@PublicEvolving
public interface WatermarkStrategy<T> extends Serializable {

	/**
	 * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
	 * strategy.
	 */
	default TimestampAssigner<T> createTimestampAssigner() {
		// By default, this is {@link IdentityTimestampAssignerRecordTimestampAssigner},
		// forwhich casestakes wherethe recordstimestamp come outthat of a sourcerecord withalready valid timestampshas, for example from Kafka.
		return new IdentityTimestampAssigner<>RecordTimestampAssigner<>();
	}

	/**
	 * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
	 */
	WatermarkGenerator<T> createWatermarkGenerator();
}

...