Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA
Discussion threadhttps://lists.apache.org/thread/d681bx4t935c30zl750gy6d41tfypbph
Vote threadhttps://lists.apache.org/thread/79thsvkfpgsqnktodj2901jp538js19j
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b

keyFLINK-31535

Release-1.18.0


Motivation

Now many watermark-related features such as the watermark alignment have been implemented on the datastream API, and it is very convenient and flexible to configure and use these features through the datastream API. However, there is currently no way to use these features through SQL.

...

Code Block
languagesql
linenumberstrue
CREATE TABLE user_actions (
  user_name STRING,
  `data` STRING,
  current_time as CURRENT_TIMESTAMPproctime(),
  WATERMARK FOR current_time AS current_time
) WITH (
  ...
);

...

In SQL layer, the watermark is closely related to each source table, so we plan to extend these features in the dynamic table options and 'OPTIONS' hint.  If the user has configured these options both in the dynamic table options and in the 'OPTIONS' hint, then the options in the 'OPTIONS' hint are preferred. If the user uses 'OPTIONS' hint for the same source table in multiple places, the first hint will be used.

We provide a poc, you can check the poc[1] to look up the code details.

1. Configurable 1. Configurable watermark emit strategy

On datastream API, we can decide whether to emit a watermark periodically or emit a watermark for each event by code logic from the implementation of the WatermarkGenerator interface:

...

Code Block
languagesql
-- configure in table options
CREATE TABLE user_actions (
  ...
  user_action_time TIMESTAMP(3),
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (   
  'scan.watermark.emit.strategy'='on-event',
  'scan.watermark.emit.on-event.gap'='10000',
  ...
);

-- use 'OPTIONS' hint
select ... from source_table /*+ OPTIONS('scan.watermark.emit.strategy'='on-event', 'scan.watermark.emit.on-event.gap'='10000') */

Note that the option 'scan.watermark.emit.on-event.gap' which is used to configure how many events to emit a watermark only works for 'on-event' strategy,  This option is not required, the default value is 1.

) */

2.Dealing with idle 2.Dealing with idle sources

On datastream API, We can configure idle-timeout  to handle idle sources in the following way:

...

Code Block
languagesql
-- configure in table options
CREATE TABLE user_actions (
  ...
  user_action_time TIMESTAMP(3),
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  'scan.watermark.idle-timeout'='1min',
  ...
);

-- use 'OPTIONS' hint
select ... from source_table /*+ OPTIONS('scan.watermark.idle-timeout'='1min') */

...

However,the watermark alignment function is not currently supported in SQL layer. We hope the flip can support it. It is expected to be configured in the following way:in the following way:

Code Block
languagesql
-- configure in table options
CREATE TABLE user_actions (
...
user_action_time TIMESTAMP(3),
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
'scan.watermark.alignment.group'='alignment-group-1',
'scan.watermark.alignment.max-drift'='1min',
'scan.watermark.alignment.update-interval'='1s',
...
);

-- use 'OPTIONS' hint
select ... from source_table /*+ OPTIONS(
Code Block
languagesql
-- configure in table options
CREATE TABLE user_actions (
...
user_action_time TIMESTAMP(3),
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
'scan.watermark.alignment.group'='alignment-group-1',
 'scan.watermark.alignment.max-drift'='1min',
 'scan.watermark.alignment.update-interval'='1s',
...
);

-- use 'OPTIONS' hint
select ... from source_table /*+ OPTIONS('scan.watermark.alignment.group'='alignment-group-1', 'scan.watermark.alignment.max-drift'='1min', 'scan.watermark.alignment.update-interval'='1s') */') */

the option 'watermark.alignment.update-interval' is not required. the default value is 1s.

Note source connectors have to implement watermark alignment of source split in order to use the watermark alignment feature since 1.17 according flip-217 [2]. If source connector does not implement flip-217, the task will run with an error, user could set 'pipeline.watermark-alignment.allow-unaligned-source-splits'= 'true' to disable watermark alignment of source split, and watermark alignment will be working properly only when your number of splits equals to the parallelism of the source operatorthe option 'watermark.alignment.update-interval' is not required. the default value is 1s.


All the options described above :

Code Block
languagesql
'scan.watermark.emit.strategy'='on-event',
'scan.watermark.emit.on-event.gap'='10000',
'scan.idle-timeout'='1min',
'scan.watermark.alignment.group'='alignment-group-1',
'scan.watermark.alignment.max-drift'='1min',
'scan.watermark.alignment.update-interval'='1s'

...

Code Block
languagejava
public class WatermarkParams implements Serializable {
    private static final long serialVersionUID = 1L;

    private WatermarkEmitStrategy emitStrategy;
    private int emitOnEventGap;
    private String alignGroupName;
    private Duration alignMaxDrift;
    private Duration alignUpdateInterval;
    private long sourceIdleTimeout;
}

To describe the watermark launch emit strategy, we need to add an enumeration class:

...

Code Block
"watermarkParams":{
    "emitStrategy" : "ON_PERIODIC",
    "emitOnEventGap" : 1,
    "alignGroupName" : null,
    "alignMaxDrift" : "PT0S",
    "alignUpdateInterval" : "PT1S",
    "sourceIdleTimeout" : -1
}

...

If some of the features are not supported, such as the watermark alignment feature on Kinesis connector, it will behave as FLIP-182 [23] and FLIP-217 [32] designed.

Rejected Alternatives

...

This idea is like extending FLIP-66[14]. However, since we already have many options for watermark related features, this would make the DDL complex and lengthy.

...

We should be cautious about adding SQL syntax, WATERMARK_PARAMS is also SQL syntax to some extent.


Reference

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDLgithub.com/yuchengxin/flink/commits/yuankui/watermark_params

[2] FLIP-217: Support watermark alignment of source splits

[3] FLIP-182: Support watermark alignment of FLIP-27 https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+ Sources 

[3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-217%3A+Support+watermark+alignment+of+source+splits 4] FLIP-66: Support Time Attribute in SQL DDL