Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagesql
CREATE TABLE table_name (
  WATERMARK FOR columnName AS watermarkStrategy
) WITH (
  ...
)
watermarkStrategy ::=
  { BOUNDED DELAY WITH INTERVAL 'string' timeUnit 
     | SYSTEM_WATERMARK
     | <expression> }

NOTE: For INTERVAL “INTERVAL 'string' timeUnittimeUnit”, Please refer Interval Literal in SQL standard. Here we only support day time intervals for now.  

NOTE: For “columnName”, it can be a nested column which can use dot syntax.

Proctime Attribute Syntax

Code Block
languagesql
CREATE TABLE table_name (
  proc AS PROCTIME()
) WITH (
  ...
)


Proposed Changes

Flink Table API & SQL handles time attribute as a field in the schema, not a system or hidden field. This allows for unified syntax for table programs in both batch and streaming environments. The only difference between time attribute field and regular field is that we have a special mark on the field type. The proposed syntax also follows this concept.

...

NOTE: For “columnName”, it can be a nested column which can use dot syntax.

NOTE: For “INTERVAL 'string' timeUnit”, Please refer Interval Literal in SQL standard. Here we only support day time intervals for now.  

...

The “WATERMARK” keyword starts a watermark definition. The “FOR” keyword defines which existing field is used to generate watermark, this field should already exist in the schema.  The “AS” keyword defines watermark strategy. We can provide some predefined watermark strategies, such as BOUNDED DELAY (covers almost all the requirements). 

Bounded Delay watermark strategy

BOUNDED DELAY: A watermark strategy for rowtime attributes which are out-of-order by a bounded time interval. Emits watermarks which are the maximum observed timestamp minus the specified delay.

NOTE:BOUNDED strategy will generate watermarks periodically to avoid too much watermarks. The periodic interval can be configured by ExecutionConfig#setAutoWatermarkInterval.  We will support configuring it via properties keys in FLIP-59.

...

The “AS” part describes what is the watermark generation strategy, but what is Ascending?   The ascending watermark? A watermark of course should be ascending.   The ascending rowtime? Why a watermark strategy part need to describe rowtime character.?

An ASCENDING watermark strategy is usually used in testing and can also be represented by “BOUNDED WITH DELAY ‘0.001’ SECOND”.

...

It’s fine if the expected rowtime field is nested, the watermark syntax allows to access a nested field using dot syntax. For example:

Code Block
languagesql
CREATE TABLE kafka_source (
  order_id STRING,
  event ROW<log_ts TIMESTAMP(3), message STRING, type INT>,
  WATERMARK FOR event.log_ts AS BOUNDED DELAY WITH DELAY '5' SECOND
) WITH (
  ...
)

...

)

...

...


Expected rowtime field does not exist in the schema?

...

Code Block
languagesql
CREATE TABLE kafka_source (
  order_id STRING,
  ts STRING,
  -- to_timestamp can be a built-in function or UDF
  rowtime AS to_timestamp(ts), 
  WATERMARK FOR rowtime AS BOUNDED WITH DELAY '5' SECOND
) WITH (
  ...
)


NOTE:

...

the design of computed column is discussed in FLIP-70.

Proctime Attribute

The proctime attribute field definition can also be defined using computed-column. Such as: pt as PROCTIME() , which defines a proctime field named “pt” in the schema. Here PROCTIME() is a built-in method to derive a proctime attribute field.

The same as above, if we have a consensus on the computed-column approach, we will leave the further discussion to the following computed column FLIP the design of computed column is discussed in FLIP-70.

Implementation

In this section, we will only discuss about the watermark implementation. The proctime attribute will be supported naturally when we support computed column. 

...