Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When using element timestamps to order elements or perform windowing it becomes necessary to keep track of the progress of time. For example, in a windowing operator the window from t to  to t + 5 can  can only be processed once the operator is certain that no more elements with a timestamp lower than t + 5 will arrive from any of its input streams. Flink uses watermarks to keep track of the timestamp of tuples passing through the system: when a source knows that no elements with a timestamp lower than t1 will  will be emitted in the future it will emit a watermark with timestamp t1. Watermarks are broadcast to downstream operators. The watermark at once specific operator at any point in time is the minimum watermark time over all its inputs. Once that watermark increases the operator can react to that and further broadcast the watermark to downstream operators. In our windowing example from earlier, the operator would wait for the watermark for time t + 5, then process the window and then broadcast watermark t + 5 itself itself.

Since an operator has to wait for watermarks from all its inputs and also since timestamps can be very late compared to the current system time this can lead to latency (also called time skew or watermark skew). This figure should help understanding the problem:

...

  • Ordering for OrderedKeyDataStream
  • Ordering for windows

 

 

Image Modified

 

Code Block
languagescala
titleDataStream Example
linenumberstrue
val stream: DataStream[MyType] = env.createStream(new KafkaSource(...))
val str1orderedStream: DataStreamOrderedKeyDataStream[(String, MyType)] = stream.flatMap { keyBy(... }
val str2: DataStream[(String, MyType)] = stream.rebalanceorderByTime()
val str3mapped: DataStream[AnotherTypeMyType] = streamorderedStream.mapflatMap { ... }

 

 

 

Timestamps on Results of Window Operations

...