You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current state"Under Discussion"

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-66-Support-time-attribute-in-SQL-DDL-tt32766.html

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In Flink 1.9, we already introduced a basic SQL DDL to create a table. However, it doesn’t support to define time attributes in SQL DDL yet. Time attribute is a basic information required by time-based operations such as windows in both Table API and SQL. That means, currently, users can’t apply window operations on the tables created by DDL. Meanwhile, we received a lot of requirements from the community to define event time in DDL since 1.9 released. It will be a great benefit to support define time attribute in SQL.

Public Interfaces

This is a list of new interfaces in SQL DDL. Please refer to “Proposed Changes” section for details.

Watermark Syntax

CREATE TABLE table_name (
  WATERMARK FOR columnName AS <watermark_strategy_expression>
) WITH (
  ...
)

NOTE: For “columnName”, it can be a nested column which can use dot syntax.

Proctime Attribute Syntax

CREATE TABLE table_name (
  proc AS SYSTEM_PROCTIME()
) WITH (
  ...
)


Proposed Changes

Flink Table API & SQL handles time attribute as a field in the schema, not a system or hidden field. This allows for unified syntax for table programs in both batch and streaming environments. The only difference between time attribute field and regular field is that we have a special mark on the field type. The proposed syntax also follows this concept.

Rowtime Attribute

A rowtime attribute must be an existing column in the table schema with TIMESTAMP type. An existing field will be marked as rowtime attribute once a watermark is defined on it. So what we need is a watermark definition to declare 1) the rowtime field, 2) the watermark strategy.

The following is the proposed watermark definition.

CREATE TABLE table_name (
  WATERMARK FOR columnName AS <watermark_strategy_expression>
) WITH (
  ...
)

NOTE: For “columnName”, it can be a nested column which can use dot syntax.


We would like to treat WATERMARK similar to a PRIMARY KEY or UNIQUE KEY 

constraint and therefore keep it in the schema definition. 

The WATERMARK definition combines the watermark strategy and rowtime field selection (i.e. which existing field used to generate watermark) in one clause, so that we can define multiple watermarks in one table in the future.

The “WATERMARK” keyword starts a watermark definition. The “FOR” keyword defines which existing field is marked as rowtime attribute, this field should already exist in the schema.  

The “AS” keyword defines watermark strategy. It allows arbitrary expression to calculate watermark. The returned value is the watermark. The return type can be a nullable BIGINT or TIMESTAMP(3). 

Watermarks are defined as longs that represent the milliseconds since the Epoch (midnight, January 1, 1970 UTC). The returned watermark will be emitted only if it is non-null and its timestamp is larger than that of the previously emitted watermark (to preserve the contract of ascending watermarks). The watermark generation expression is called by the system for every record. The system will periodically emit the largest generated watermark. If the current watermark is still identical to the previous one, or is null, or the timestamp of the returned watermark is smaller than that of the last emitted one, then no new watermark will be emitted.

The interval in which the watermark is emitted depends on ExecutionConfig#getAutoWatermarkInterval(). We will treat `autoWatermarkInterval==0` as the PUNCTUATED mode. In this mode, every generated watermark will be emitted only if it is not null and greater than the last emitted one. This allows us avoid introducting additional keyword to support AssignerWithPunctuatedWatermarks behavior.

For common cases, we can provide some suggested and easy-to-use ways to define commonly used watermark strategies. Such as:

  • Bounded Out of Orderness: rowtimeField - INTERVAL 'string' timeUnit
  • Preserve Watermark From Source: SYSTEM_WATERMARK()

Bounded Out of Orderness

A bounded out of orderness watermark strategy is used to emit Watermarks that lag behind the element with event-time timestamp by a fixed amount of time. 

In this case, the watermark can be generated by the timestamp of the element minus the specified late time. For example:


CREATE TABLE kafka_source (
  user_id STRING,
  log_ts TIMESTAMP(3),
  ingestion_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (
  ...
)

It defines a watermark for log_ts rowtime field, and the watermark is generated as rowtime - 5 seconds.

Preserve Watermark From Source

The FROM-SOURCE watermark descriptor preserves the assigned watermarks from the underlying source implementation. In SQL DDL, the proposed grammar is using a built-in SYSTEM_WATERMARK() function:

CREATE TABLE myTable (
  rowtime AS SYSTEM_ROWTIME(),
  WATERMARK FOR rowtime AS SYSTEM_WATERMARK()
) WITH (
 ...
)

NOTE: If the watermark strategy is SYSTEM_WATERMARK(), then the corresponding rowtime field must be derived using SYSTEM_ROWTIME(). Otherwise an exception will be thrown by the framework.

The SYSTEM_ROWTIME() is a built-in function and can only be used in DDL as a computed column. It means it uses timestamp of the StreamRecord, which is generated by SourceContext#collectWithTimestamp(element, timestamp). 

The SYSTEM_WATERMARK() is a built-in function too, and can only be used as a watermark strategy expression. It means the watermark is generated by the source system itself and the framework should preserve the assigned watermark from source instead generate a new one. 

SYSTEM_ROWTIME() and SYSTEM_WATERMARK() correspond to Rowtime#timestampsFromSource() and Rowtime#watermarksFromSource() in descriptor.

So, it’s easy to introduce this strategy in DDL now, because we can convert them into properties as the same as Rowtime descriptor. Connector factories can check it if they support it and throw an unsupported exception otherwise.

Complex watermark strategies

Users can also use a user defined ScalarFunction to support more complex watermark strategy. 

For example, assuming the log_ts field describes when the record was created, and some records carry a flag, marking them as the end of a sequence such that no elements with smaller timestamps can come anymore. Then we can implement a simple UDF to emit watermark depends on log_ts and flag fields.


CREATE TABLE myTable (
  log_ts TIMESTAMP(3),
  flag BOOLEAN,
  WATERMARK FOR log_ts AS watermarkOnFlag(log_ts, flag)
) WITH (
...
)
public class WatermarkOnFlag extends ScalarFunction {
  Long eval(long rowtime, boolean isEndOfSequence) {
    return isEndOfSequence ? rowtime : null;
  }
}

This makes defining watermarks as flexible as PunctuatedWatermarkAssigner in DataStream.

Limitation: For more advanced strategies, e.g. a histogram-based approach would need to remember the values of the last x records. The interface of a scalar function would still work for that, but it would be a stateful function (we don’t support stateful scalar function yet). That means we still can use scalar function to implement it but the next watermark maybe wrong after failover. This needs further discussions, if we want to support such advanced strategies. 

Dropping ASCENDING watermark strategy

A watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp are not late.

I would like to avoid introducing ASCENDING watermark strategy, because it is a confusing keyword. The “AS” part describes what is the watermark generation strategy, but what is Ascending? 

The ascending watermark? A watermark of course should be ascending. 

The ascending rowtime? Why a watermark strategy part need to describe rowtime character.

An ASCENDING watermark strategy is usually used in testing and can also be represented by “rowtimeField - ‘0.001’ SECOND”.

So I think we should be careful to add ASCENDING strategy.

Expected rowtime is nested in schema?

It’s fine if the expected rowtime field is nested, the watermark syntax allows to access a nested field using dot syntax. For example:

CREATE TABLE kafka_source (
  order_id STRING,
  event ROW<log_ts TIMESTAMP(3), message STRING, type INT>,
  WATERMARK FOR event.log_ts AS event.log_ts - '5' SECOND
) WITH (
  ...
)


Expected rowtime field does not exist in the schema?

It is a common case that the expected rowtime field needs to be derived from existing fields, e.g. the original timestamp is a string or a bigint value represents a UTC timestamp, or nested in a JSON. That’s why we provided TimestampExtractor in descriptors to extract timestamp.

In SQL DDL, we would like to leverage computed-column syntax to derive from existing fields using built-in functions or user defined functions. Computed column is a widely used feature in DataBase systems, e.g. MySQL, MS Server, Oracle.  With computed-column, it can make defining a rowtime attributes transparent and simple and can cover TimestampExtractor

For example:

CREATE TABLE kafka_source (
  order_id STRING,
  ts STRING,
  -- to_timestamp can be a built-in function or UDF
  rowtime AS to_timestamp(ts), 
  WATERMARK FOR rowtime AS rowtime - '5' SECOND
) WITH (
  ...
)

NOTE: the design of computed column is discussed in FLIP-70.

Rejeceted Alternative Syntax

1. Reject to introduce BOUNDED DELAY syntax in the first version.

WATERMARK FOR columnName AS BOUNDED DELAY WITH INTERVAL 'string' timeUnit

The BOUNDED DELAY syntax can be substituted in expression way “rowtime - INTERVAL ‘string’ timeUnit”. We should be careful to add more keywords,  because they affect every query, not just DDL.


2. Reject to use PunctuatedWatermarkAssigner class name 'strategyClassName'

WATERMARK FOR columnName AS CUSTOM 'strategyClassName'
WATERMARK FOR columnName AS 'strategyClassName'


3. Reject to support an optional watermark name.

WATERMARK [watermarkName] FOR columnName AS ...

Watermark name is not necessary in the first version when we only have one watermark definition in DDL.

Proctime Attribute

The proctime attribute field definition can also be defined using computed-column. Such as:

pt as SYSTEM_PROCTIME() , which defines a proctime field named “pt” in the schema. Here SYSTEM_PROCTIME()is a built-in function to derive a proctime attribute field.

The same as above, the design of computed column is discussed in FLIP-70.

Implementation

In this section, we will only discuss about the watermark implementation. The proctime attribute will be supported naturally when we support computed column. 

Note that, we will only support this in blink planner. In old flink planner, we will simply throw unsupported exception.

Currently, we already support  Rowtime descriptor end-to-end. That is, the Rowtime descriptor will be converted into properties with table schema and other information to store in catalog. If the registered table is queried, then connector TableFactory can derive the rowtime attribute descriptor from properties and set it into the created TableSource when createTableSource, see KafkaTableSourceSinkFactoryBase. At the end phase of optimization, i.e. translateToPlan, the planner will apply a watermark assigner transformation after the source transformation according to the RowtimeAttributeDescriptor, see StreamExecTableSourceScan.

The following figure shows how this happens. 

So the only thing we need to do, is bridging watermark DDL definition to the same property keys and values as Rowtime descriptor does. Then it should work. 

For example, 

ts TIMESTAMP(3), 
WATERMARK FOR ts AS ts - '5' SECOND

It will be converted into following properties.

'schema.0.rowtime.timestamps.type' = 'from-field'
'schema.0.rowtime.timestamps.from' = 'ts'
'schema.0.rowtime.watermarks.type' = 'periodic-bounded'
'schema.0.rowtime.watermarks.delay' = '5000'

Deprecate DefinedRowtimeAttributes

Currently, in order to make the Rowtime descriptor work, the corresponding connector have to implement DefinedRowtimeAttributes interface, and handover the derived rowtime attribute to the interface. It is trivial and confused to users. 

In terms of theory, TableSource shouldn’t care about the rowtime attribute and watermark information. TableSource should only care about the physical deserialization and network transmission. The planner should already know all the information about rowtime and watermark and can automatically apply watermark assigner after the source transformation. 

We can deprecate DefinedRowtimeAttributes, and automatically apply watermark assigner by planner to avoid doing the same logic in every connector. The same to DefinedProctimeAttribute.

The core idea is that the framework could extract the rowtime attribute and watermark information from the properties and carry the information in TableSourceTable instead of in TableSource. When translateToPlan, we will look into the TableSourceTable to assign watermark or not. 

The steps will be as following:

  1. Parse DDL into SqlCreateTable with time attributes information
  2. Convert SqlCreateTable to CatalogTable in SqlToOperationConverter#convert
    1. time attribute information will be placed in CatalogTable#getTableSchema
    2. convert the time attribute information into the same properties as descriptors.
  3. Pass the time attribute information (named TimeAttributeDescriptor) into TableSourceTable when DatabaseCalciteSchema#convertCatalogTable
  4. Create TableScan node (and computed column Project) and WatermarkAssigner node when FlinkRelOptTable#toRel
  5. Add a Rule to combine WatermarkAssigner and potential rowtime computed column project into TableScan. (DEFAULT_REWRITE optimization phase)
  6. Apply rowtime computed column if existed and watermark assigner according to the TimeAttributeDescriptor when StreamExecTableSourceScan#translateToPlan. Ignore TableSource’s DefinedRowtimeAttributes if TimeAttributeDescriptor exists. 

Compatibility, Deprecation, and Migration Plan


  • All existing DDL syntax are still valid and compatible and have no changed behavior.
  • No new interface introduced for TableSource or TableSink, so the interfaces are still compatible. 

Test Plan


The implementation can be tested with unit tests for every new feature. And we can add integration tests for connectors to verify it can cooperate with existing source implementations.

  • No labels