Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Watermark: Inform operators that no elements with a timestamp older than or equal to the watermark timestamp should arrive at the operator.

TimestampAssigner: Assigns event time timestamps to elements. These timestamps are used by all functions that operate on event time, such as event time windows. Here is the TimestampAssigner interface`TimestampAssigner` interface:

Code Block
languagejava
titleTimestampAssigner
public interface TimestampAssigner<T> {
    long extractTimestamp(T element, long recordTimestamp);
}

...

For Flink DataStream jobs:

  • When creating a source, user provides a `WatermarkStrategy` to `StreamExecutionEnvironment#fromSource`.

  • If the the source supports event time natively

  • When creating a source, user provides a WatermarkStrategy to StreamExecutionEnvironment#fromSource.

  • If the the source supports event time natively  (e.g.,  KafkaSource) or the user provides a custom TimestampAssigner in the WatermarkStrategy to extract the timestamp from the recordKafkaSource) or the user provides a custom `TimestampAssigner` in the `WatermarkStrategy` to extract the timestamp from the record, Flink will add the timestamp to the record. Otherwise, the timestamp on the record will be  the timestamp on the record will be `Long.MIN_VALUEVALUE`.

  • If the user uses noWatermarkGenerator in the WatermarkStrategy, the job will not generate watermarks. Otherwise, the job will periodically emit watermarks, and the watermark value depends on event time. The output frequency of watermarks is determined by If the user uses `NoWatermarkGenerator` in the `WatermarkStrategy`, the job will not generate watermarks. Otherwise, the job will periodically emit watermarks, and the watermark value depends on event time. The output frequency of watermarks is determined by `pipeline.auto-watermark-intervalinterval`, with a default value of 200ms.


Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
  • Classes marked with the @Public annotation
  • On-disk binary formats, such as checkpoints/savepoints
  • User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
  • Configuration settings

1) Modify `org.apache.flink.api.common.eventtime.Watermark` and `org.apache.flink.streaming.api.watermark.Watermark `by adding a new field `useProcessingTime`, with a default value of false.


Code Block
languagejava
titleWatermark
/**
 * Watermarks are the progress indicators in the data streams. A watermark signifies that no events
 * with a timestamp smaller than or equal to the watermark's time will occur after the watermark.
 *
 * <ul>
 *   <li>A watermark with a timestamp <i>T</i> and useProcessingTime set to false indicates that the
 *       event time of the stream has progressed to time <i>T</i>.
 *   <li>A watermark with a timestamp <i>T</i> and useProcessingTime set to true indicates that the
 *       event time of the stream progresses in synchronization with the system time. The timestamp
 *       <i>T</> must be less than the current system time. Otherwise, an exception will be thrown.
 * </ul>
 *
 * <p>Watermarks are created at the sources and propagate through the streams and operators.
 *
 * <p>In some cases a watermark is only a heuristic, meaning some events with a lower timestamp may
 * still follow. In that case, it is up to the logic of the operators to decide what to do with the
 * "late events". Operators can for example ignore these late events, route them to a different
 * stream, or send update to their previously emitted results.
 *
 * <p>When a source reaches the end of the input, it emits a final watermark with timestamp {@code
 * Long.MAX_VALUE}, indicating the "end of time".
 *
 * <p>Note: A stream's time starts with a watermark of {@code Long.MIN_VALUE}. That means that all
 * records in the stream with a timestamp of {@code Long.MIN_VALUE} are immediately late.
 *
 * <p>Note: After sending a watermark with useProcessingTime set to true, it is only allowed to send
 * subsequent watermarks with useProcessingTime set to true. Sending a watermark with
 * useProcessingTime set to false will result in an exception.
 */
@Public
public final class Watermark implements Serializable {
  	...

    /**
     * If useProcessingTime set to false, this is the time of the watermark in milliseconds. If
     * useProcessingTime set to true, this is the last effective time of the watermark in
     * milliseconds.
     */
    private final long timestamp;

  
    /**
     * If this is true, this watermark indicates the event time of the stream progresses in
     * synchronization with the system time.
     */
    private final boolean useProcessingTime;

    public Watermark(long timestamp) {
      this(timestamp, false);
    }

    public Watermark(long timestamp, boolean useProcessingTime) {
        this.timestamp = timestamp;
        this.useProcessingTime = useProcessingTime;
    }

    /** Returns whether the time of watermark can be determined by the system time. */
    public boolean useProcessingTime() {
        return useProcessingTime;
    }
}



Please note that the update do not change the abstraction of the information passed by the `Watermark` class but rather provide a natural extension to its current capabilities. When `useProcessingTime` is false, the watermark indicates that the stream has progressed to a specific event time. When `useProcessingTime` is true, it indicates that the stream progresses in synchronization with the system time. In both case, the watermark indicates the time progression in the data stream.


2) Update `AbstractStreamOperator/AbstractStreamOperatorV2` to support processing watermarks with `useProcessingTime` set to true:

Original Behavior:

  • `AbstractStreamOperator` and `AbstractStreamOperatorV2` manage and trigger event time and processing time timers through `InternalTimeServiceManager`.

  • Only when the operator receives a watermark, it calls `InternalTimeServiceManager#advanceWatermak` to advance the event time and trigger event time timers.


Updated Behavior:

  • When receiving a watermark with `useProcessingTime` set to false, the behavior remains the same as before.

  • When receiving a watermark with `useProcessingTime` set to true, the operator starts the `ScheduledThreadPoolExecutorthat` trigger the event time timers as the system time progresses.


After modifying the operator and the operator receive a watermark with `useProcessingTime` set to true, the behavior will be the same as processing time operators.


3) Introduce `WatermarkCharacteristic` enum class

Code Block
languagejava
titleWatermarkCharacteristic
/**
 * WatermarkCharacteristic is used by the source implementation to describe the characteristic of
 * the watermark base on the WatermarkStrategy.
 */
@PublicEvolving
public enum WatermarkCharacteristic {

    /**
     * This implies that only watermark with useProcessingTime set to false can be sent and no
     * watermark will be sent in case of processing time. This is the default for all the source
     * implementations.
     */
    UNDEFINED,

    /**
     * This implies that source will send watermark with useProcessingTime set to false in case of
     * event time and with useProcessingTime set to true in case of processing time is required.
     */
    ANY_WATERMARK,
}


4)
Update `Source` interface,introducing `getWatermarkCharacteristic` method

Code Block
languagejava
titleSource
@Public
public interface Source<T, SplitT extends SourceSplit, EnumChkT>
        extends SourceReaderFactory<T, SplitT> {

    /**
     * Get the watermark characteristic of this source.
     *
     * @param watermarkStrategy The watermark strategy of the source.
     * @return the watermark characteristic of this source.
     */
    default WatermarkCharacteristic getWatermarkCharacteristic(
            WatermarkStrategy<?> watermarkStrategy) {
        return WatermarkCharacteristic.UNDEFINED;
    }
}


5) Update Source Implementations


KafkaSource

  • Update the KafkaSourceReaderto emit watermark with useProcessingTimeset to true at the beginning, if NoWatermarkGeneratoris used. 

  • Update KafkaSource to overwrite the getWatermarkCharacteristicmethod to returns ANY_WATERMAK.


MySqlSource

Update to the MySqlSourceis documented in the appendix.

...


Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

...