Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14320

Release1.10

Status

Current state"Under Discussion"

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-66-Support-time-attribute-in-SQL-DDL-tt32766.html

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagesql
CREATE TABLE table_name (
  WATERMARK FOR columnName AS watermarkStrategy<watermark_strategy_expression>
) WITH (
  ...
)
watermarkStrategy ::=
  { BOUNDED DELAY WITH INTERVAL 'string' timeUnit 
     | SYSTEM_WATERMARK
     | <expression> }

NOTE: For “INTERVAL 'string' timeUnit”, Please refer Interval Literal in SQL standard. Here we only support day time intervals for now.  NOTE: For “columnName”, it can be a nested column which can use dot syntax.

...

Code Block
languagesql
CREATE TABLE table_name (
  proc AS SYSTEM_PROCTIME()
) WITH (
  ...
)

...

The following is the proposed watermark definition. 

Code Block
languagesql
CREATE TABLE table_name (
  WATERMARK FOR columnName AS watermarkStrategy<watermark_strategy_expression>
) WITH (
  ...
)
watermarkStrategy ::=
  { BOUNDED DELAY WITH INTERVAL 'string' timeUnit 
     | SYSTEM_WATERMARK
     | <expression> }

NOTE: For “columnName”, it can be a nested column which can use dot syntax.

NOTE: For “INTERVAL 'string' timeUnit”, Please refer Interval Literal in SQL standard. Here we only support day time intervals for now.  


We would like to treat WATERMARK similar to a PRIMARY KEY or UNIQUE KEY  KEY 

constraint and therefore keep it in the schema definition. 

...

The “WATERMARK” keyword starts a watermark definition. The “FOR” keyword defines which existing field is used to generate watermarkmarked as rowtime attribute, this field should already exist in the schema.    

The “AS” keyword defines watermark strategy. It allows arbitrary expression to calculate watermark. The returned value is the watermark. The return type can be a nullable BIGINT or TIMESTAMP(3). 

Watermarks are defined as longs that represent the milliseconds since the Epoch (midnight, January 1, 1970 UTC). The returned watermark will be emitted only if it is non-null and its timestamp is larger than that of the previously emitted watermark (to preserve the contract of ascending watermarks). The watermark generation expression is called by the system for every record. The system will periodically emit the largest generated watermark. If the current watermark is still identical to the previous one, or is null, or the timestamp of the returned watermark is smaller than that of the last emitted one, then no new watermark will be emitted.

The interval in which the watermark is emitted depends on ExecutionConfig#getAutoWatermarkInterval(). We can provide some predefined watermark strategies, such as BOUNDED DELAY (covers almost all the requirements). 

Bounded Delay watermark strategy

BOUNDED DELAY: A watermark strategy for rowtime attributes which are out-of-order by a bounded time interval. Emits watermarks which are the maximum observed timestamp minus the specified delay.

BOUNDED strategy will generate watermarks periodically to avoid too much watermarks. The periodic interval can be configured by ExecutionConfig#setAutoWatermarkInterval.  We will support configuring it via properties keys in FLIP-59.

will treat `autoWatermarkInterval==0` as the PUNCTUATED mode. In this mode, every generated watermark will be emitted only if it is not null and greater than the last emitted one. This allows us avoid introducting additional keyword to support AssignerWithPunctuatedWatermarks behavior.

For common cases, we can provide some suggested and easy-to-use ways to define commonly used watermark strategies. Such as:

  • Bounded Out of Orderness: rowtimeField - INTERVAL 'string' timeUnit
  • Preserve Watermark From Source: SYSTEM_WATERMARK()

Bounded Out of Orderness

A bounded out of orderness watermark strategy is used to emit Watermarks that lag behind the element with event-time timestamp by a fixed amount of time. 

In this case, the watermark can be generated by the timestamp of the element minus the specified late time. For example:


Code Block
languagesql
CREATE TABLE kafka_source (
   user_id STRING,
   log_ts TIMESTAMP(3),
   ingestion_ts TIMESTAMP(3),
   WATERMARK FOR log_ts AS BOUNDEDlog_ts DELAY- WITH INTERVAL '5' SECOND
) WITH (
   ...
)

Custom watermark strategies


It defines a watermark for log_ts rowtime field, and the watermark

...

is

...

generated as rowtime - 5 seconds.

Preserve Watermark From Source

The FROM-SOURCE watermark descriptor preserves the assigned watermarks from the underlying source implementation. In SQL DDL, the proposed grammar is using a built-in SYSTEM_WATERMARK() function:A custom watermark strategy is supported by using arbitrary expression.

Code Block
languagesql
CREATE TABLE myTable (
  rowtime AS SYSTEM_ROWTIME(),
  WATERMARK FOR rowtimeFieldrowtime AS expression

The expression can be any arbitrary expression which is used to calculate Watermark. The returned value is the watermark. The return type can be “Long” or “long” or the same type with rowtime type. If the returned value is null, then no watermark should be generated. 

Example#1

For example, the built-in “BOUNDED DELAY WITH INTERVAL '5' SECOND” strategy can also be expressed in the expression way: 

Code Block
languagesql
WATERMARK FOR rowtimeField AS rowtimeField - INTERVAL '5' SECOND
Example#2
SYSTEM_WATERMARK()
) WITH (
 ...
)

NOTE: If the watermark strategy is SYSTEM_WATERMARK(), then the corresponding rowtime field must be derived using SYSTEM_ROWTIME(). Otherwise an exception will be thrown by the framework.

The SYSTEM_ROWTIME() is a built-in function and can only be used in DDL as a computed column. It means it uses timestamp of the StreamRecord, which is generated by SourceContext#collectWithTimestamp(element, timestamp). 

The SYSTEM_WATERMARK() is a built-in function too, and can only be used as a watermark strategy expression. It means the watermark is generated by the source system itself and the framework should preserve the assigned watermark from source instead generate a new one. 

SYSTEM_ROWTIME() and SYSTEM_WATERMARK() correspond to Rowtime#timestampsFromSource() and Rowtime#watermarksFromSource() in descriptor.

So, it’s easy to introduce this strategy in DDL now, because we can convert them into properties as the same as Rowtime descriptor. Connector factories can check it if they support it and throw an unsupported exception otherwise.

Complex watermark strategies

Users can also use a user defined ScalarFunction to support more complex watermark strategy. 

For example, assuming the log_ts field describes when the record was created, and some records carry a flag, marking them as the end of a sequence such that no elements with smaller timestamps can come anymore. Then we can implement a simple UDF to emit watermark depends on log_ts and flag fields.  


Code Block
languagesql
CREATE TABLE myTable (
  log_ts TIMESTAMP(3),
  flag BOOLEAN,
  WATERMARK FOR log_ts AS watermarkOnFlag(log_ts, flag)
) WITH (
...
)

...

This makes defining watermarks as flexible as PunctuatedWatermarkAssigner in DataStream.

NOTE: The custom watermark strategy will work in PERIODIC mode. The PERIODIC mode will emit the generated watermark periodically in an interval configured via ExecutionConfig#setAutoWatermarkInterval. We will treat `autoWatermarkInterval==0` as the PUNCTUATED mode. This allows for punctuated watermarks without introducing [PERIODIC | PUNCTUATED] keywords or syntax.

We don’t want to introduce PUNCTUATED mode because this can avoid introducing [PERIODIC | PUNCTUATED] keywords or syntax.

Rejeceted Alternative Syntax

Code Block
languagesql
WATERMARK [wmName] FOR rowtimeField AS CUSTOM myUdf(rowtimeField, ...)
WATERMARK [wmName] FOR columnName AS CUSTOM 'strategyClassName'
WATERMARK [wmName] FOR rowtimeField AS [PERIODIC | PUNCTUATED] myUdf(...)
WATERMARK [wmName] FOR columnName AS 'strategyClassName'

Dropping ASCENDING watermark strategy

Limitation: For more advanced strategies, e.g. a histogram-based approach would need to remember the values of the last x records. The interface of a scalar function would still work for that, but it would be a stateful function (we don’t support stateful scalar function yet). That means we still can use scalar function to implement it but the next watermark maybe wrong after failover. This needs further discussions, if we want to support such advanced strategies. 


Dropping ASCENDING watermark strategy

A ASCENDING is a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp are not late.

I would like to avoid introducing ASCENDING watermark strategy, because it is a confusing keyword. The “AS” part describes what is the watermark generation strategy, but what is Ascending?   

The ascending watermark? A watermark of course should be ascending.   

The ascending rowtime? Why a watermark strategy part need to describe rowtime character?.

An ASCENDING watermark strategy is usually used in testing and can also be represented by “BOUNDED WITH DELAY “rowtimeField - ‘0.001’ SECOND”.

So I think we propose should be careful to drop ASCENDING strategy in this design.

From Source watermark strategy

The FROM-SOURCE watermark strategy preserves the assigned watermarks from the underlying source implementation. The grammar is following:

Code Block
languagesql
CREATE TABLE myTable (
  rowtime AS SYSTEM_ROWTIME(),
  WATERMARK FOR rowtime AS SYSTEM_WATERMARK
) WITH (
...
)

NOTE: If the watermark strategy is SYSTEM_WATERMARK, then the corresponding rowtime field must be derived using SYSTEM_ROWTIME. Otherwise an exception will be thrown by the framework.

The SYSTEM_ROWTIME() is a built-in function and can only be used in DDL as a computed column. It means means it uses timestamp of the StreamRecord, which is generated by SourceContext#collectWithTimestamp(element, timestamp). 

The SYSTEM_WATERMARK is a built-in watermark strategy, it is a keyword (or a function with parentheses?). It means the watermark is generated by the source system itself and the framework should preserve the assigned watermark from source instead generate a new one. 

SYSTEM_ROWTIME and SYSTEM_WATERMARK correspond to Rowtime#timestampsFromSource() and Rowtime#watermarksFromSource() in descriptor.

So, it’s easy to introduce this strategy in DDL now, because we can convert them into properties as the same as Rowtime descriptor. Connector factories can check it if they support it and throw an unsupported exception otherwise. add ASCENDING strategy.

Expected rowtime is nested in schema?

...

Code Block
languagesql
CREATE TABLE kafka_source (
  order_id STRING,
  event ROW<log_ts TIMESTAMP(3), message STRING, type INT>,
   WATERMARK FOR event.log_ts AS BOUNDED DELAY WITH DELAY event.log_ts - '5' SECOND
) WITH (
  ...
)

...

Code Block
languagesql
CREATE TABLE kafka_source (
   order_id STRING,
   ts STRING,
    -- to_timestamp can be a built-in function or UDF
   rowtime AS to_timestamp(ts), 
   WATERMARK FOR rowtime AS BOUNDEDrowtime WITH- DELAY '5' SECOND
) WITH (
   ...
)


NOTE: the design of computed column is discussed in FLIP-70.

Rejeceted Alternative Syntax

1. Reject to introduce BOUNDED DELAY syntax in the first version.

Code Block
languagesql
WATERMARK FOR columnName AS BOUNDED DELAY WITH INTERVAL 'string' timeUnit

The BOUNDED DELAY syntax can be substituted in expression way “rowtime - INTERVAL ‘string’ timeUnit”. We should be careful to add more keywords,  because they affect every query, not just DDL.


2. Reject to use PunctuatedWatermarkAssigner class name 'strategyClassName'

Code Block
languagesql
WATERMARK FOR columnName AS CUSTOM 'strategyClassName'
WATERMARK FOR columnName AS 'strategyClassName'



3. Reject to support an optional watermark name.

Code Block
languagesql
WATERMARK [watermarkName] FOR columnName AS ...

Watermark name is not necessary in the first version when we only have one watermark definition in DDL.

Proctime Attribute

The proctime attribute field definition can also be defined using computed-column. Such as: 

pt as SYSTEM_PROCTIME() , which defines a proctime field named “pt” in the schema. Here SYSTEM_PROCTIME()is a built-in method function to derive a proctime attribute field.

...

Note that, we will only support this in blink planner. In old flink planner, we will simply throw unsupported exception.

Currently, we already support  Rowtime descriptor end-to-end. That is, the Rowtime descriptor will be converted into properties with table schema and other information to store in catalog. If the registered table is queried, then connector TableFactory can derive the rowtime attribute descriptor from properties and set it into the created TableSource when createTableSource, see KafkaTableSourceSinkFactoryBase. At the end phase of optimization, i.e. translateToPlan, the planner will apply a watermark assigner transformation after the source transformation according to the RowtimeAttributeDescriptor, see StreamExecTableSourceScan.

The following figure shows how this happens. 

Image Removed

So the only thing we need to do, is bridging watermark DDL definition to the same property keys and values as Rowtime descriptor does. Then it should work. 

For example, 

Code Block
languagesql
ts TIMESTAMP(3), 
WATERMARK FOR ts AS BOUNDED WITH DELAY '5' SECOND

The core idea is that the framework could extract the rowtime attribute and watermark information from the properties and carry the information in TableSourceTable instead of in TableSource. When translateToPlan, we will look into the TableSourceTable to assign watermark or not. 

The steps will be as following:

  1. Parse DDL into SqlCreateTable with time attributes information
  2. Convert SqlCreateTable to CatalogTable in SqlToOperationConverter#convert
    1. time attribute information will be placed in CatalogTable#getTableSchema, called WatermarkSpec
    2. convert the time attribute information into properties.
  3. Pass the time attribute information (named TimeAttributeDescriptor) into TableSourceTable when DatabaseCalciteSchema#convertCatalogTable
  4. Create TableScan node (and computed column Project) and WatermarkAssigner node when FlinkRelOptTable#toRel
  5. Add a Rule to combine WatermarkAssigner and potential rowtime computed column project into TableScan. (DEFAULT_REWRITE optimization phase)
  6. Apply rowtime computed column if existed and watermark assigner according to the TimeAttributeDescriptor when StreamExecTableSourceScan#translateToPlan. Ignore TableSource’s DefinedRowtimeAttributes if TimeAttributeDescriptor exists. 


The properties representention of watermark can not be the same with the properties of Rowtime descriptor. Because the watermark structure proposed by this design can't fully consume Rowtime descriptor properties.

So for a statement `WATERMARK FOR f1.q2 AS f1.q2 - INTERVAL '5' SECOND`, the proposed properties might be:It will be converted into following properties.

Code Block
languagesql
'schema.watermark.0.rowtime.timestamps.type' = 'from-fieldf1.q2'
'schema.watermark.0.rowtimestrategy.timestamps.fromexpression' = 'ts`f1`.`q2` - INTERVAL '5' SECOND'
'schema.watermark.0.rowtimestrategy.watermarks.typedatatype' = 'periodic-bounded'
'schema.0.rowtime.watermarks.delay' = '5000'TIMESTAMP(3)'

The conversion between WatermarkSpec and properties is handled by framework, and users shouldn't care about it. 

The connector developer can get the watermark information from the `CatalogTable#getTableSchema#getWatermarkSpecs` from `TableSourceFactory#createTableSource(ObjectPath tablePath, CatalogTable table)`.

Actually, the connector developers don't need to get watermark information for now, because the watermark assigner will be applied by framework automatically.

Deprecate DefinedRowtimeAttributes

...

We can deprecate DefinedRowtimeAttributes , and automatically apply watermark assigner by planner to avoid doing the same logic in every connector. The same to DefinedProctimeAttribute.The core idea is that the framework could extract the rowtime attribute and watermark information from the properties and carry the information in TableSourceTable instead of in TableSource. When translateToPlan, we will look into the TableSourceTable to assign watermark or not. once we don't support temporal TableSource anymore.

Compatibility, Deprecation, and Migration Plan

...