NOTE: This document is a design draft and does not reflect the state of a stable release, but the work in progress on the current master, and future considerations.
Before implementing operations that rely in some way on time it is necessary to define the different concepts of time in Flink and how time is handled in a streaming topology.
Different Notions of Time
In Flink Streaming, every element has a timestamp attached to it. When a value enters a streaming topology through a source this source attaches a timestamp to the value. The timestamp can either be the current system time of the source (ingress time) or it can be a timestamp that is extracted from the value (event time).
Some operations, such a time-windows and ordering by time, require a notion of time. In these operations the user has the choice between two approaches: 1) using the current system time of the machine that an operation is running on, or 2) using the timestamp that was attached to a value when it entered the topology. Because there are two ways of assigning timestamps at sources there are now three different types of times. All of which have different pros and cons:
- Operator Time: This usually gives the fastest results since using timestamps in processing might require waiting for slower operations and buffering of elements since the elements will arrive at operations out-of-order. That is, they are usually not ordered by their timestamps. This will, however, not give you any guarantees about the ordering of elements when joining several data streams and when operations require a repartitioning of the elements.
- Event Time: This makes it possible to define orderings or time windows that are completely independent of the system time of the machines that a topology is running on. This might, however, lead to very high latencies because operations have to wait until they can be sure that they have seen all elements up to a certain timestamp.
- Ingress Time: This gives you a good compromise between event time and operator time. Even after repartitioning data or joining several data streams the elements can still be processed according to the time that they entered the system. The latency should also be low because the timestamps that sources attach to elements are steadily increasing.
Tracking the Progress of Time
This only applies to operations that use timestamp time (ingress time or event time) since operator time is only local to one parallel instance of an operation.
When using element timestamps to order elements or perform windowing it becomes necessary to keep track of the progress of time. For example, in a windowing operator the window from t
to t + 5
can only be processed once the operator is certain that no more elements with a timestamp lower than t + 5 will arrive from any of its input streams. Flink uses watermarks to keep track of the timestamp of tuples passing through the system: when a source knows that no elements with a timestamp lower than t1
will be emitted in the future it will emit a watermark with timestamp t1
. Watermarks are broadcast to downstream operators. The watermark at once specific operator at any point in time is the minimum watermark time over all its inputs. Once that watermark increases the operator can react to that and further broadcast the watermark to downstream operators. In our windowing example from earlier, the operator would wait for the watermark for time t + 5
, then process the window and then broadcast watermark t + 5
itself.
Since an operator has to wait for watermarks from all its inputs and also since timestamps can be very late compared to the current system time this can lead to latency (also called time skew or watermark skew). This figure should help understanding the problem:
The ideal case would be if the timestamp of the elements matched the processing time when they arrive at an operator. This rarely happens, though, and it can also happen that elements with an earlier timestamp arrive after elements with a later timestamp. The difference between the ideal watermark and the actual watermark at any point in time is the latency (or time skew).
The concept of watermarks as used in Flink is inspired by Google's MillWheel: http://research.google.com/pubs/pub41378.html and by work building on top of it for Google Cloud Dataflow API: http://strataconf.com/big-data-conference-uk-2015/public/schedule/speaker/193653
Ordering Elements in a Stream
Elements in a stream can be ordered by timestamp. Normally, it would not be possible to order elements of an infinite stream. By using watermarks, however, Flink can order elements in a stream. The ordering operator has to buffer all elements it receives. Then, when it receives a watermark it can sort all elements that have a timestamp that is lower than the watermark and emit them in the sorted order. This is correct because the watermark signals that not more elements can arrive that would be intermixed with the sorted elements. The following code example and diagram illustrate this idea:
DataStream Example val stream: DataStream[MyType] = env.createStream(new KafkaSource(...)) val orderedStream: OrderedKeyDataStream[MyType] = stream.keyBy(...).orderByTime() val mapped: DataStream[MyType] = orderedStream.flatMap { ... }
|
---|
Using the same method, Flink can also sort the input of a windowing operator. This can be useful if you want to have window trigger policies that react to changes in the data and need the input in sorted order. We will come back to this when we look at windowing policies further below.
Timestamps on Results of Window Operations
When using timestamp based windows the timestamp that get's attached to the result of a windowing operation becomes important. For example, because the timestamp is used in downstream window operations (as we will see below).
val stream: DataStream[MyType] = env.createStream(new KafkaSource(...)) val windowed: DataStream[MyType] = stream .keyBy(...) .window(...) .reduce { ... }
This is also true for the following example where the operator gets the whole group if elements with the same key inside a window:
val stream: DataStream[MyType] = env.createStream(new KafkaSource(...)) val windowed: DataStream[MyType] = stream .keyBy(...) .window(...) .reduceGroup { ... }
Sematics of Sequences of Windows
Aligned Time Windows
General Windows
Accessing the Timestamp of a Record
Implementing Windows on Ingress- and Event Time
Time-windows
Pane based approach
Other windows
Staging buffer