Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state

Discussion thread:

...

[This FLIP proposal is a joint work between Xuannan Su   and Dong Lin  ]

Motivation

Assuming the user needs to perform a temporal join, where the Probe Side comes from Kafka and the Build Side comes from a MySQL CDC Source (with a snapshot reading phase and a binlog reading phase), and all input data lacks event-time information. The user requires each data record on the Probe Side to be joined with at least the data from the Build Side's snapshot phase. In other words, the Join operator needs to wait for the Build Side's snapshot phase to finish reading its data before processing the Probe Side's data.

...

This document proposes a new API and mechanism at the DataStream, allowing the Source (e.g., MySQL CDC Source) to send a watermark that allows that downstream operator to advance its event time base on the system time. In the temporal join operator, the probe side can wait for the build side until the build side receives the watermark to start advancing its event time base on system time.


Terminology and Background

The FLIP will make changes to Flink's watermark and timestamp concepts. To help understand the intuition behind the design, we will introduce the relevant concepts in this section.

...

  • When creating a source, user provides a WatermarkStrategy to StreamExecutionEnvironment#fromSource.

  • If the the source supports event time natively (e.g., KafkaSource) or the user provides a custom TimestampAssigner in the WatermarkStrategy to extract the timestamp from the record, Flink will add the timestamp to the record. Otherwise, the timestamp on the record will be Long.MIN_VALUE.

  • If the user uses noWatermarkGenerator in the WatermarkStrategy, the job will not generate watermarks. Otherwise, the job will periodically emit watermarks, and the watermark value depends on event time. The output frequency of watermarks is determined by pipeline.auto-watermark-interval, with a default value of 200ms.


Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

...

  • User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
  • Configuration settings
  • Exposed monitoring information


Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.