Discussion thread | https://lists.apache.org/thread/d681bx4t935c30zl750gy6d41tfypbph | ||||||
---|---|---|---|---|---|---|---|
Vote thread | https://lists.apache.org/thread/79thsvkfpgsqnktodj2901jp538js19j | ||||||
JIRA |
|
| |||
Release |
---|
1.18.0 |
Motivation
Now many watermark-related features such as the watermark alignment have been implemented on the datastream API, and it is very convenient and flexible to configure and use these features through the datastream API. However, there is currently no way to use these features through SQL.
This FLIP proposes to enhance the availability of watermark options of SQL.
Proposed Change
Current capabilities of watermark
...
in SQL layer
The event time attribute is defined using a WATERMARK statement in CREATE table DDL. A watermark statement defines a watermark generation expression on an existing event time field, which marks the event time field as the event time attribute. Some examples:
...
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE user_actions ( user_name STRING, `data` STRING, current_time as CURRENT_TIMESTAMPproctime(), WATERMARK FOR current_time AS current_time ) WITH ( ... ); |
...
Note that although the syntax to use watermark in SQL is the same, the location of generating watermark may be different. The watermark of the source that implements the `SupportsWatermarkPushDown` interface is generated in the source operator, while the watermark of the source that does not implement the `SupportsWatermarkPushDown` interface is generated in a downstream operator named 'WatermarkAssigner'. If the watermark is generated in the downstream 'WatermarkAssigner' operator, many watermark-related features, such as watermark alignment, will can not be implemented. So the features that this flip intends to support are only for those sources that implement the `SupportsWatermarkPushDown` interface.
Watermark-related features
...
intends to support
In SQL layer, the watermark is closely related to each source table, so we plan to use the table-scan predicate's hint named 'WATERMARK_PARAMS' to extend these features.
...
extend these features in the dynamic table options and 'OPTIONS' hint. If the user has configured these options both in the dynamic table options and in the 'OPTIONS' hint, then the options in the 'OPTIONS' hint are preferred. If the user uses 'OPTIONS' hint for the same source table in multiple places, the first hint will be used.
We provide a poc, you can check the poc[1] to look up the code details.
1. Configurable watermark emit strategy
On datastream API, we can decide whether to emit a watermark periodically or emit a watermark for each event by code logic from the implementation of the WatermarkGenerator interface:
...
For SQL, the default watermark emit-strategy is 'ON_PERIODICon-periodic', which can be set manually via table options and hint :
Code Block | ||
---|---|---|
| ||
-- configure in table options CREATE TABLE user_actions ( ... user_action_time TIMESTAMP(3), WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND ) WITH ( 'scan.watermark.emit.strategy'='on-periodic', ... ); -- use 'OPTIONS' hint select ... from source_table /*+ WATERMARK_PARAMSOPTIONS('scan.watermark.emit-.strategy'='ON_PERIODICon-periodic') */ |
If the user wants to configure the 'ON_EVENTon-event' strategy, he/she can use table options or hint like this:
Code Block | ||
---|---|---|
| ||
select-- configure in table options CREATE TABLE user_actions ( ... from sourceuser_action_table /*+ WATERMARK_PARAMS('emit-strategy'='ON_EVENT', 'emit-gap-on-event'='1000') */ |
Note that the option ‘emit-gap-on-event’ which is used to configure how many events to emit a watermark only works for ‘ON_EVENT’ strategy.
...
time TIMESTAMP(3),
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
'scan.watermark.emit.strategy'='on-event'
...
);
-- use 'OPTIONS' hint
select ... from source_table /*+ OPTIONS('scan.watermark.emit.strategy'='on-event') */ |
2.Dealing with idle sources
On datastream API, We can configure idle-timeout to handle idle sources in the following way:
...
However, in SQL layer, we can configure a global idle-timeout value through the parameter 'table.exec.source.idle-timeout', which means the multiple sources will share the value, we would like to configure 'idle-timeout' for each source separately, and it is expected to be configured in the following way:
Code Block | ||
---|---|---|
| ||
-- configure in table options CREATE TABLE user_actions ( ... user_action_time TIMESTAMP(3), WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND ) WITH ( 'scan.watermark.idle-timeout'='1min', ... ); -- use 'OPTIONS' hint select ... from source_table /*+ WATERMARK_PARAMSOPTIONS('scan.watermark.idle-timeout'='1min') */ |
3.Watermark alignment
On datastream API,We can use watermark alignment feature in the following way :
...
However,the watermark alignment function is not currently supported in SQL layer. We hope the flip can support it. It is expected to be configured in the following way:
Code Block | ||
---|---|---|
| ||
-- configure in table options CREATE TABLE user_actions ( ... user_action_time TIMESTAMP(3), WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND ) WITH ( 'scan.watermark.alignment.group'='alignment-group-1', 'scan.watermark.alignment.max-drift'='1min', 'scan.watermark.alignment.update-interval'='1s', ... ); -- use 'OPTIONS' hint select ... from source_table /*+ WATERMARK_PARAMSOPTIONS('align-scan.watermark.alignment.group'='group1alignment-group-1', 'align-scan.watermark.alignment.max-drift'='5s1min', 'align-scan.watermark.alignment.update-interval'='1s') */ |
the option 'watermark.alignment.update-interval' is not required. the default value is 1s.
Note source connectors have to implement watermark alignment of source split in order to use the watermark alignment feature since 1.17 according flip-217 [2]. If source connector does not implement flip-217, the task will run with an error, user could set 'pipeline.watermark-alignment.allow-unaligned-source-splits'= 'true' to disable watermark alignment of source split, and watermark alignment will be working properly only when your number of splits equals to the parallelism of the source operator.
All the options described above can be configured in the following way :
Code Block | ||
---|---|---|
| ||
select 'scan.watermark.. from source_table /*+ WATERMARK_PARAMS('emit-emit.strategy'='ON_EVENT', 'emit-gap-on-event'='1000', 'scan.watermark.idle-timeout'='1min', 'align- 'scan.watermark.alignment.group'='group1alignment-group-1', 'align- 'scan.watermark.alignment.max-drift'='5s1min', 'align- 'scan.watermark.alignment.update-interval'='1s') */ |
Of course, users can choose some of the items to configure according to their needs.
Public Interface
In order to implement the above features, it is expected to add a wrapper class for the watermark parameters, like this:
Code Block | ||
---|---|---|
| ||
public class WatermarkParams implements Serializable {
private static final long serialVersionUID = 1L;
private WatermarkEmitStrategy emitStrategy;
private String alignGroupName;
private Duration alignMaxDrift;
private Duration alignUpdateInterval;
private long sourceIdleTimeout;
} |
To describe the watermark emit strategy, we need to add an enumeration class:
Code Block | ||
---|---|---|
| ||
public enum WatermarkEmitStrategy {
ON_EVENT("on-event"),
ON_PERIODIC("on-periodic")
} |
The biggest change probably is that we need to modify the constructor of WatermarkPushDownSpec
to use the params wrapper class:
Code Block | ||
---|---|---|
| ||
@JsonCreator public WatermarkPushDownSpec( @JsonProperty(FIELD_NAME_WATERMARK_EXPR) RexNode watermarkExpr, @JsonProperty(FIELD_NAME_IDLE_TIMEOUT_MILLIS) long idleTimeoutMillis, @JsonProperty(FIELD_NAME_PRODUCED_TYPE) RowType producedType, @JsonProperty(FIELD_NAME_WATERMARK_PARAMS) WatermarkParams watermarkParams) { super(producedType); this.watermarkExpr = checkNotNull(watermarkExpr); this.idleTimeoutMillis = idleTimeoutMillis; this.watermarkParams = watermarkParams; } |
Same as GeneratedWatermarkGeneratorSupplier
:
Code Block | ||
---|---|---|
| ||
public GeneratedWatermarkGeneratorSupplier(
GeneratedWatermarkGenerator generatedWatermarkGenerator,
WatermarkParams watermarkParams) {
this.generatedWatermarkGenerator = generatedWatermarkGenerator;
this.watermarkParams = watermarkParams;
} |
Compiled Plan
After implementing the above features, The following contents will be added to the WatermarkPushDown module in the json plan:
Code Block |
---|
"watermarkParams":{
"emitStrategy" : "ON_PERIODIC",
"alignGroupName" : null,
"alignMaxDrift" : "PT0S",
"alignUpdateInterval" : "PT1S",
"sourceIdleTimeout" : -1
} |
But this doesn't cause compatibility issues, CompilePlan and ExecutePlan are backward compatible. We will add unit tests to validate the compatibility.
Migration Plan and Compatibility
This feature is biased towards adding more support in for SQL layer, so there are no compatibility-related issues.
If some of the features are not supported, such as the watermark alignment feature on Kinesis connector, it will behave as FLIP-182 [23] and FLIP-217 [32] designed.
Rejected Alternatives
Adding watermark related options in the SQL DDL of watermark column
This idea is like extending FLIP-66[14]. However, since we already have many options for watermark related features, this would make the DDL complex and lengthy.
Adding watermark related options
...
with a new table-scan hint named 'WATERMARK_PARAMS'
We should be cautious about adding SQL syntax, WATERMARK_PARAMS is also SQL syntax to some extent
...
Watermark related options should be treated as a general feature for reading from message queue or even files, these options shall not be part of the connector options.
Reference
[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL
github.com/yuchengxin/flink/commits/yuankui/watermark_params
[2] FLIP-217: Support watermark alignment of source splits
[3] FLIP-182: Support watermark alignment of FLIP-27 Sources
[4] FLIP-66: Support Time Attribute in SQL DDL[3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-217%3A+Support+watermark+alignment+of+source+splits