Status
Current state: "Under Discussion"
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-66-Support-time-attribute-in-SQL-DDL-tt32766.html
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In Flink 1.9, we already introduced a basic SQL DDL to create a table. However, it doesn’t support to define time attributes in SQL DDL yet. Time attribute is a basic information required by time-based operations such as windows in both Table API and SQL. That means, currently, users can’t apply window operations on the tables created by DDL. Meanwhile, we received a lot of requirements from the community to define event time in DDL since 1.9 released. It will be a great benefit to support define time attribute in SQL.
Public Interfaces
This is a list of new interfaces in SQL DDL. Please refer to “Proposed Changes” section for details.
Watermark Syntax
CREATE TABLE table_name ( WATERMARK FOR columnName AS watermarkStrategy ) WITH ( ... ) watermarkStrategy ::= { BOUNDED DELAY WITH INTERVAL 'string' timeUnit | SYSTEM_WATERMARK | <expression> }
NOTE: For “INTERVAL 'string' timeUnit”, Please refer Interval Literal in SQL standard. Here we only support day time intervals for now.
NOTE: For “columnName”, it can be a nested column which can use dot syntax.
Proctime Attribute Syntax
CREATE TABLE table_name ( proc AS PROCTIME() ) WITH ( ... )
Flink Table API & SQL handles time attribute as a field in the schema, not a system or hidden field. This allows for unified syntax for table programs in both batch and streaming environments. The only difference between time attribute field and regular field is that we have a special mark on the field type. The proposed syntax also follows this concept.
Rowtime Attribute
A rowtime attribute must be an existing column in the table schema with TIMESTAMP type. An existing field will be marked as rowtime attribute once a watermark is defined on it. So what we need is a watermark definition to declare 1) the rowtime field, 2) the watermark strategy.
The following is the proposed watermark definition.
CREATE TABLE table_name ( WATERMARK FOR columnName AS watermarkStrategy ) WITH ( ... ) watermarkStrategy ::= { BOUNDED DELAY WITH INTERVAL 'string' timeUnit | SYSTEM_WATERMARK | <expression> }
NOTE: For “columnName”, it can be a nested column which can use dot syntax.NOTE: For “INTERVAL 'string' timeUnit”, Please refer Interval Literal in SQL standard. Here we only support day time intervals for now.
We would like to treat WATERMARK similar to a PRIMARY KEY or UNIQUE KEY constraint and therefore keep it in the schema definition.
The WATERMARK definition combines the watermark strategy and rowtime field selection (i.e. which existing field used to generate watermark) in one clause, so that we can define multiple watermarks in one table in the future.
The “WATERMARK” keyword starts a watermark definition. The “FOR” keyword defines which existing field is used to generate watermark, this field should already exist in the schema. The “AS” keyword defines watermark strategy. We can provide some predefined watermark strategies, such as BOUNDED DELAY (covers almost all the requirements).
- BOUNDED DELAY: A watermark strategy for rowtime attributes which are out-of-order by a bounded time interval. Emits watermarks which are the maximum observed timestamp minus the specified delay.
NOTE: BOUNDED strategy will generate watermarks periodically to avoid too much watermarks. The periodic interval can be configured by ExecutionConfig#setAutoWatermarkInterval. We will support configuring it via properties keys in FLIP-59.
For example:
CREATE TABLE kafka_source ( user_id STRING, log_ts TIMESTAMP(3), ingestion_ts TIMESTAMP(3), WATERMARK FOR log_ts AS BOUNDED DELAY WITH INTERVAL '5' SECOND ) WITH ( ... )
Custom watermark strategies
It defines a watermark for log_ts rowtime field, and the watermark strategy is bounded delay with interval 5 seconds, which means the watermark is generated as rowtime - 5 seconds.
A custom watermark strategy is supported by using arbitrary expression.
WATERMARK FOR rowtimeField AS expression
The expression can be any arbitrary expression which is used to calculate Watermark. The returned value is the watermark. The return type can be “Long” or “long” or the same type with rowtime type. If the returned value is null, then no watermark should be generated.
Example#1
For example, the built-in “BOUNDED DELAY WITH INTERVAL '5' SECOND” strategy can also be expressed in the expression way:
WATERMARK FOR rowtimeField AS rowtimeField - INTERVAL '5' SECOND
Example#2
Users can also use a user defined ScalarFunction to support more complex watermark strategy.
For example, assuming the log_ts field describes when the record was created, and some records carry a flag, marking them as the end of a sequence such that no elements with smaller timestamps can come anymore. Then we can implement a simple UDF to emit watermark depends on log_ts and flag fields.
CREATE TABLE myTable ( log_ts TIMESTAMP(3), flag BOOLEAN, WATERMARK FOR log_ts AS watermarkOnFlag(log_ts, flag) ) WITH ( ... )
public class WatermarkOnFlag extends ScalarFunction { Long eval(long rowtime, boolean isEndOfSequence) { return isEndOfSequence ? rowtime : null; } }
This makes defining watermarks as flexible as PunctuatedWatermarkAssigner in DataStream.
NOTE: The custom watermark strategy will work in PERIODIC mode. The PERIODIC mode will emit the generated watermark periodically in an interval configured via ExecutionConfig#setAutoWatermarkInterval. We will treat `autoWatermarkInterval==0` as the PUNCTUATED mode. This allows for punctuated watermarks without introducing [PERIODIC | PUNCTUATED] keywords or syntax.
We don’t want to introduce PUNCTUATED mode because this can avoid introducing [PERIODIC | PUNCTUATED] keywords or syntax.
Rejeceted Alternative Syntax
WATERMARK [wmName] FOR rowtimeField AS CUSTOM myUdf(rowtimeField, ...) WATERMARK [wmName] FOR columnName AS CUSTOM 'strategyClassName' WATERMARK [wmName] FOR rowtimeField AS [PERIODIC | PUNCTUATED] myUdf(...) WATERMARK [wmName] FOR columnName AS 'strategyClassName'
Dropping ASCENDING watermark strategy
ASCENDING is a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp are not late.
I would like to avoid introducing ASCENDING watermark strategy, because it is a confusing keyword.
The “AS” part describes what is the watermark generation strategy, but what is Ascending?
The ascending watermark? A watermark of course should be ascending.
The ascending rowtime? Why a watermark strategy part need to describe rowtime character.
An ASCENDING watermark strategy is usually used in testing and can also be represented by “BOUNDED WITH DELAY ‘0.001’ SECOND”.
So we propose to drop ASCENDING strategy in this design.
From Source watermark strategy
The FROM-SOURCE watermark strategy preserves the assigned watermarks from the underlying source implementation. The grammar is following:
CREATE TABLE myTable ( rowtime AS SYSTEM_ROWTIME(), WATERMARK FOR rowtime AS SYSTEM_WATERMARK ) WITH ( ... )
NOTE: If the watermark strategy is SYSTEM_WATERMARK, then the corresponding rowtime field must be derived using SYSTEM_ROWTIME. Otherwise an exception will be thrown by the framework.
The SYSTEM_ROWTIME() is a built-in function and can only be used in DDL as a computed column. It means means it uses timestamp of the StreamRecord, which is generated by SourceContext#collectWithTimestamp(element, timestamp).
The SYSTEM_WATERMARK is a built-in watermark strategy, it is a keyword (or a function with parentheses?). It means the watermark is generated by the source system itself and the framework should preserve the assigned watermark from source instead generate a new one.
SYSTEM_ROWTIME and SYSTEM_WATERMARK correspond to Rowtime#timestampsFromSource() and Rowtime#watermarksFromSource() in descriptor.
So, it’s easy to introduce this strategy in DDL now, because we can convert them into properties as the same as Rowtime descriptor. Connector factories can check it if they support it and throw an unsupported exception otherwise.
Expected rowtime is nested in schema?
It’s fine if the expected rowtime field is nested, the watermark syntax allows to access a nested field. For example:
CREATE TABLE kafka_source ( order_id STRING, event ROW<log_ts TIMESTAMP(3), message STRING, type INT>, WATERMARK FOR event.log_ts AS BOUNDED DELAY WITH DELAY '5' SECOND ) WITH ( ... )
CREATE TABLE kafka_source ( |
Expected rowtime field does not exist in the schema?
It is a common case that the expected rowtime field needs to be derived from existing fields, e.g. the original timestamp is a string or a bigint value represents a UTC timestamp, or nested in a JSON. That’s why we provided TimestampExtractor in descriptors to extract timestamp.
In SQL DDL, we would like to leverage computed-column syntax to derive from existing fields using built-in functions or user defined functions. Computed column is a widely used feature in DataBase systems, e.g. MySQL, MS Server, Oracle. With computed-column, it can make defining a rowtime attributes transparent and simple and can cover TimestampExtractor.
For example:
CREATE TABLE kafka_source ( order_id STRING, ts STRING, -- to_timestamp can be a built-in function or UDF rowtime AS to_timestamp(ts), WATERMARK FOR rowtime AS BOUNDED WITH DELAY '5' SECOND ) WITH ( ... )
NOTE: If we have a consensus on the computed-column approach, we will prepare a separate FLIP for computed column. It is another orthogonal and big topic, can should deserve a FLIP. In this design, we will not deep dive into the computed column syntax and implementations.
Proctime Attribute
The proctime attribute field definition can also be defined using computed-column. Such as:
pt as PROCTIME() , which defines a proctime field named “pt” in the schema. Here PROCTIME() is a built-in method to derive a proctime attribute field.
The same as above, if we have a consensus on the computed-column approach, we will leave the further discussion to the following computed column FLIP.
Implementation
In this section, we will only discuss about the watermark implementation. The proctime attribute will be supported naturally when we support computed column.
Note that, we will only support this in blink planner. In old flink planner, we will simply throw unsupported exception.
Currently, we already support Rowtime descriptor end-to-end. That is, the Rowtime descriptor will be converted into properties with table schema and other information to store in catalog. If the registered table is queried, then connector TableFactory can derive the rowtime attribute descriptor from properties and set it into the created TableSource when createTableSource, see KafkaTableSourceSinkFactoryBase. At the end phase of optimization, i.e. translateToPlan, the planner will apply a watermark assigner transformation after the source transformation according to the RowtimeAttributeDescriptor, see StreamExecTableSourceScan.
The following figure shows how this happens.
So the only thing we need to do, is bridging watermark DDL definition to the same property keys and values as Rowtime descriptor does. Then it should work.
For example,
ts TIMESTAMP(3), WATERMARK FOR ts AS BOUNDED WITH DELAY '5' SECOND
It will be converted into following properties.
'schema.0.rowtime.timestamps.type' = 'from-field' 'schema.0.rowtime.timestamps.from' = 'ts' 'schema.0.rowtime.watermarks.type' = 'periodic-bounded' 'schema.0.rowtime.watermarks.delay' = '5000'
Deprecate DefinedRowtimeAttributes
Currently, in order to make the Rowtime descriptor work, the corresponding connector have to implement DefinedRowtimeAttributes interface, and handover the derived rowtime attribute to the interface. It is trivial and confused to users.
In terms of theory, TableSource shouldn’t care about the rowtime attribute and watermark information. TableSource should only care about the physical deserialization and network transmission. The planner should already know all the information about rowtime and watermark and can automatically apply watermark assigner after the source transformation.
We can deprecate DefinedRowtimeAttributes, and automatically apply watermark assigner by planner to avoid doing the same logic in every connector. The same to DefinedProctimeAttribute.
The core idea is that the framework could extract the rowtime attribute and watermark information from the properties and carry the information in TableSourceTable instead of in TableSource. When translateToPlan, we will look into the TableSourceTable to assign watermark or not.
Compatibility, Deprecation, and Migration Plan
- All existing DDL syntax are still valid and compatible and have no changed behavior.
- No new interface introduced for TableSource or TableSink, so the interfaces are still compatible.
Test Plan
The implementation can be tested with unit tests for every new feature. And we can add integration tests for connectors to verify it can cooperate with existing source implementations.