Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state"Under Discussion"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In Flink 1.9, we already introduced a basic SQL DDL to create a table. However, it doesn’t support to define time attributes in SQL DDL yet. Time attribute is a basic information required by time-based operations such as windows in both Table API and SQL. That means, currently, users can’t apply window operations on the tables created by DDL. Meanwhile, we received a lot of requirements from the community to define event time in DDL since 1.9 released. It will be a great benefit to support define time attribute in SQL.

Public Interfaces

This is a list of new interfaces in SQL DDL. Please refer to “Proposed Changes” section for details.

Watermark Syntax

Code Block
languagesql
CREATE TABLE table_name (
  WATERMARK FOR columnName AS watermarkStrategy
) WITH (
  ...
)
watermarkStrategy ::=
  { BOUNDED DELAY WITH INTERVAL 'string' timeUnit 
     | SYSTEM_WATERMARK
     | <expression> }

...

NOTE: For “columnName”, it can be a nested column which can use dot syntax.

Proctime Attribute Syntax

Code Block
languagesql
CREATE TABLE table_name (
  proc AS PROCTIME()
) WITH (
  ...
)

...

Flink Table API & SQL handles time attribute as a field in the schema, not a system or hidden field. This allows for unified syntax for table programs in both batch and streaming environments. The only difference between time attribute field and regular field is that we have a special mark on the field type. The proposed syntax also follows this concept.

Rowtime Attribute

A rowtime attribute must be an existing column in the table schema with TIMESTAMP type. An existing field will be marked as rowtime attribute once a watermark is defined on it. So what we need is a watermark definition to declare 1) the rowtime field, 2) the watermark strategy.

...

The “WATERMARK” keyword starts a watermark definition. The “FOR” keyword defines which existing field is used to generate watermark, this field should already exist in the schema.  The “AS” keyword defines watermark strategy. We can provide some predefined watermark strategies, such as BOUNDED DELAY (covers almost all the requirements). 

Bounded Delay watermark strategy

BOUNDED DELAY: A watermark strategy for rowtime attributes which are out-of-order by a bounded time interval. Emits watermarks which are the maximum observed timestamp minus the specified delay.

...

Code Block
languagesql
CREATE TABLE kafka_source (
  user_id STRING,
  log_ts TIMESTAMP(3),
  ingestion_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS BOUNDED DELAY WITH INTERVAL '5' SECOND
) WITH (
  ...
)


Custom watermark strategies

It defines a watermark for log_ts rowtime field, and the watermark strategy is bounded delay with interval 5 seconds, which means the watermark is generated as rowtime - 5 seconds.

...

The expression can be any arbitrary expression which is used to calculate Watermark. The returned value is the watermark. The return type can be “Long” or “long” or the same type with rowtime type. If the returned value is null, then no watermark should be generated. 

Example#1

For example, the built-in “BOUNDED DELAY WITH INTERVAL '5' SECOND” strategy can also be expressed in the expression way: 

Code Block
languagesql
WATERMARK FOR rowtimeField AS rowtimeField - INTERVAL '5' SECOND


Example#2

Users can also use a user defined ScalarFunction to support more complex watermark strategy. 

...

So we propose to drop ASCENDING strategy in this design.

From Source watermark strategy

The FROM-SOURCE watermark strategy preserves the assigned watermarks from the underlying source implementation. The grammar is following:

...

So, it’s easy to introduce this strategy in DDL now, because we can convert them into properties as the same as Rowtime descriptor. Connector factories can check it if they support it and throw an unsupported exception otherwise. 

Expected rowtime is nested in schema?

It’s fine if the expected rowtime field is nested, the watermark syntax allows to access a nested field using dot syntax. For example:

Code Block
languagesql
CREATE TABLE kafka_source (
  order_id STRING,
  event ROW<log_ts TIMESTAMP(3), message STRING, type INT>,
  WATERMARK FOR event.log_ts AS BOUNDED DELAY WITH DELAY '5' SECOND
) WITH (
  ...
)


Expected rowtime field does not exist in the schema?

It is a common case that the expected rowtime field needs to be derived from existing fields, e.g. the original timestamp is a string or a bigint value represents a UTC timestamp, or nested in a JSON. That’s why we provided TimestampExtractor in descriptors to extract timestamp.

...

NOTE: the design of computed column is discussed in FLIP-70.

Proctime Attribute

The proctime attribute field definition can also be defined using computed-column. Such as: pt as PROCTIME() , which defines a proctime field named “pt” in the schema. Here PROCTIME() is a built-in method to derive a proctime attribute field.

The same as above, the design of computed column is discussed in FLIP-70.

Implementation

In this section, we will only discuss about the watermark implementation. The proctime attribute will be supported naturally when we support computed column. 

...

Code Block
languagesql
'schema.0.rowtime.timestamps.type' = 'from-field'
'schema.0.rowtime.timestamps.from' = 'ts'
'schema.0.rowtime.watermarks.type' = 'periodic-bounded'
'schema.0.rowtime.watermarks.delay' = '5000'


Deprecate DefinedRowtimeAttributes

Currently, in order to make the Rowtime descriptor work, the corresponding connector have to implement DefinedRowtimeAttributes interface, and handover the derived rowtime attribute to the interface. It is trivial and confused to users. 

...

The core idea is that the framework could extract the rowtime attribute and watermark information from the properties and carry the information in TableSourceTable instead of in TableSource. When translateToPlan, we will look into the TableSourceTable to assign watermark or not. 

Compatibility, Deprecation, and Migration Plan


  • All existing DDL syntax are still valid and compatible and have no changed behavior.
  • No new interface introduced for TableSource or TableSink, so the interfaces are still compatible. 

Test Plan


The implementation can be tested with unit tests for every new feature. And we can add integration tests for connectors to verify it can cooperate with existing source implementations.