You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »


Discussion thread-
Vote thread-
JIRA-
Release-


Motivation

Now many watermark-related features such as the watermark alignment have been implemented on the datastream API, and it is very convenient and flexible to configure and use these features through the datastream API. However, there is currently no way to use these features through table API & SQL.

This FLIP proposes to enhance the availability of watermark options of table API & SQL.

Proposed Change

Current capabilities of watermark on table API & SQL

The event time attribute is defined using a WATERMARK statement in CREATE table DDL. A watermark statement defines a watermark generation expression on an existing event time field, which marks the event time field as the event time attribute. Some examples:

Monotonously Increasing Timestamps:

CREATE TABLE user_actions (
  user_name STRING,
  `data` STRING,
  current_time as CURRENT_TIMESTAMP,
  WATERMARK FOR current_time AS current_time
) WITH (
  ...
);


Fixed Amount of Lateness:

CREATE TABLE user_actions (
  user_name STRING,
  `data` STRING,
  user_action_time TIMESTAMP(3),
  -- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);


These are all the capabilities of watermark on the table API. The following table shows a more detailed comparison:


datastream API

table API & SQL

Monotonously Increasing Timestamps

supported

supported

Fixed Amount of Lateness

supported

supported

Dealing With Idle Sources

supported

limited support

Watermark Alignment

supported

unsupport

Periodic Watermark Generator

supported

supported

Punctuated Watermark Generator

supported

unsupport


Note that although the syntax to use watermark in SQL is the same, the location of generating watermark may be different. The watermark of the source that implements the `SupportsWatermarkPushDown` interface is generated in the source operator, while the watermark of the source that does not implement the `SupportsWatermarkPushDown` interface is generated in a downstream operator named ‘WatermarkAssigner’.If the watermark is generated in the downstream 'WatermarkAssigner' operator, many watermark-related features, such as watermark alignment, will can not be implemented. So the features that this flip intends to support are only for those sources that implement the `SupportsWatermarkPushDown` interface.

Watermark-related features that this flip intends to support

At the table API & SQL level, the watermark is closely related to each source table, so we plan to use the table-scan predicate's hint named ‘WATERMARK_PARAMS’ to extend these features.

1.Configurable watermark emit strategy

On datastream API, we can decide whether to emit a watermark periodically or emit a watermark for each event by code logic from the implementation of the WatermarkGenerator interface:

@Public
public interface WatermarkGenerator<T> {

    /**
    * Called for every event, allows the watermark generator to examine
    * and remember the event timestamps, or to emit a watermark based on
    * the event itself.
    */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
    * Called periodically, and might emit a new watermark, or not.
    *
    * <p>The interval in which this method is called and Watermarks
    * are generated depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
    */
    void onPeriodicEmit(WatermarkOutput output);
}

But on table API & SQL, for those sources that implement the SupportsWatermarkPushDown interface, the watermark can only be emitted periodically, the default period is 200ms, which can be changed by the pipeline parameter 'pipeline.auto-watermark-interval'. Currently, there is no way to configure the strategy to emit a watermark for each event. We would like to configure the watermark emit strategy for the source which implements the `SupportsWatermarkPushDown` interface. We intend to configure the watermark emit strategy in the following way:

On table API & SQL, the default watermark emit-strategy is 'ON_PERIODIC', which can be set manually via hint :

select ... from source_table /*+ WATERMARK_PARAMS('emit-strategy'='ON_PERIODIC') */

If the user wants to configure the 'ON_EVENT' strategy, he/she can use hint like this:

select ... from source_table /*+ WATERMARK_PARAMS('emit-strategy'='ON_EVENT', 'emit-gap-on-event'='1000') */

Note that the option ‘emit-gap-on-event’ which is used to configure how many events to emit a watermark only works for ‘ON_EVENT’ strategy.

For the 'ON_EVENT' strategy,  option ‘emit-gap-on-event’ can configure how many events to emit a watermark, the default value is 1. We will also add a global parameter 'table.exec.watermark-emit.gap' to achieve the same goal, which will be valid for each source and will ease the user's configuration to some extent.

2.Dealing with idle sources

On datastream API, We can configure idle-timeout  to handle idle sources in the following way:

WatermarkStrategy
    .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
    .withIdleness(Duration.ofMinutes(1));

However, on table API & SQL, we can configure a global idle-timeout value through the parameter 'table.exec.source.idle-timeout', which means the multiple sources will share the value, we would like to configure 'idle-timeout' for each source separately, and it is expected to be configured in the following way:

select ... from source_table /*+ WATERMARK_PARAMS('idle-timeout'='1min') */

3.Watermark alignment

On datastream API,We can use watermark alignment feature in the following way :

WatermarkStrategy
    .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
    .withWatermarkAlignment("align-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)); 

However,the watermark alignment function is not currently supported on the table API & SQL. We hope the flip can support it. It is expected to be configured in the following way:

select ... from source_table /*+ WATERMARK_PARAMS('align-group'='group1', 'align-max-drift'='5s', 'align-update-interval'='1s') */


All the options described above can be configured in the following way:

select ... from source_table /*+ WATERMARK_PARAMS('emit-strategy'='ON_EVENT', 'emit-gap-on-event'='1000', 'idle-timeout'='1min', 'align-group'='group1', 'align-max-drift'='5s', 'align-update-interval'='1s') */

Of course, users can choose some of the items to configure according to their needs.


Migration Plan and Compatibility

This feature is biased towards adding more support in Table API & SQL, so there are no compatibility-related issues.

If some of the features are not supported, such as the watermark alignment feature on Kinesis connector, it will behave as FLIP-182 [2] and FLIP-217 [3] designed.

 

Rejected Alternatives

Adding watermark related options in the SQL DDL of watermark column

This idea is like extending FLIP-66[1]. However, since we already have many options for watermark related features, this would make the DDL complex and lengthy.

Adding watermark related options in the connector options

Watermark related options should be treated as a general feature for reading from message queue or even files, these options shall not be part of the connector options.

Reference

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL

[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources 

[3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-217%3A+Support+watermark+alignment+of+source+splits 

  • No labels