Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhttps://lists.apache.org/thread/d681bx4t935c30zl750gy6d41tfypbph
Vote threadhttps://lists.apache.org/thread/79thsvkfpgsqnktodj2901jp538js19j
JIRA

Jira
serverASF JIRA
serverId5aa69414-

a9e9-3523-82ec-879b028fb15b
keyFLINK-31535

JIRA-

Release-1.18.0


Motivation

Now many watermark-related features such as the watermark alignment have been implemented on the datastream API, and it is very convenient and flexible to configure and use these features through the datastream API. However, there is currently no way to use these features through SQL.

...

Code Block
languagesql
linenumberstrue
CREATE TABLE user_actions (
  user_name STRING,
  `data` STRING,
  current_time as CURRENT_TIMESTAMPproctime(),
  WATERMARK FOR current_time AS current_time
) WITH (
  ...
);

...

Code Block
languagesql
-- configure in table options
CREATE TABLE user_actions (
  ...
  user_action_time TIMESTAMP(3),
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  'scan.watermark.idle-timeout'='1min',
  ...
);

-- use 'OPTIONS' hint
select ... from source_table /*+ OPTIONS('scan.watermark.idle-timeout'='1min') */

3.Watermark alignment

...

Note source connectors have to implement watermark alignment of source split in order to use the watermark alignment feature since 1.17 according flip-217 [2]. If source connector does not implement flip-217, the task will run with an error, user could set `pipeline'pipeline.watermark-alignment.allow-unaligned-source-splits` to true splits'= 'true' to disable watermark alignment of source split, and watermark alignment will be working properly only when your number of splits equals to the parallelism of the source operator.

...

Code Block
languagesql
'scan.watermark.emit.strategy'='on-event',
'scan.watermark.idle-timeout'='1min',
'scan.watermark.alignment.group'='alignment-group-1',
'scan.watermark.alignment.max-drift'='1min',
'scan.watermark.alignment.update-interval'='1s'

...

To describe the watermark launch emit strategy, we need to add an enumeration class:

...