...
Note that, we will only support this in blink planner. In old flink planner, we will simply throw unsupported exception.
Currently, we already support Rowtime descriptor end-to-end. That is, the Rowtime descriptor will be converted into properties with table schema and other information to store in catalog. If the registered table is queried, then connector TableFactory can derive the rowtime attribute descriptor from properties and set it into the created TableSource when createTableSource, see KafkaTableSourceSinkFactoryBase. At the end phase of optimization, i.e. translateToPlan, the planner will apply a watermark assigner transformation after the source transformation according to the RowtimeAttributeDescriptor, see StreamExecTableSourceScan.
The following figure shows how this happens.
So the only thing we need to do, is bridging watermark DDL definition to the same property keys and values as Rowtime descriptor does. Then it should work.
For example,
Code Block | ||
---|---|---|
| ||
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - '5' SECOND |
It will be converted into following properties.
Code Block | ||
---|---|---|
| ||
'schema.0.rowtime.timestamps.type' = 'from-field'
'schema.0.rowtime.timestamps.from' = 'ts'
'schema.0.rowtime.watermarks.type' = 'periodic-bounded'
'schema.0.rowtime.watermarks.delay' = '5000' |
Deprecate DefinedRowtimeAttributes
Currently, in order to make the Rowtime descriptor work, the corresponding connector have to implement DefinedRowtimeAttributes interface, and handover the derived rowtime attribute to the interface. It is trivial and confused to users.
In terms of theory, TableSource shouldn’t care about the rowtime attribute and watermark information. TableSource should only care about the physical deserialization and network transmission. The planner should already know all the information about rowtime and watermark and can automatically apply watermark assigner after the source transformation.
We can deprecate DefinedRowtimeAttributes, and automatically apply watermark assigner by planner to avoid doing the same logic in every connector. The same to DefinedProctimeAttribute.
The core idea is that the framework could extract the rowtime attribute and watermark information from the properties and carry the information in TableSourceTable instead of in TableSource. When translateToPlan, we will look into the TableSourceTable to assign watermark or not.
The steps will be as following:
...
- time attribute information will be placed in CatalogTable#getTableSchema
- convert the time attribute information into the same properties as descriptors.
...
The core idea is that the framework could extract the rowtime attribute and watermark information from the properties and carry the information in TableSourceTable instead of in TableSource. When translateToPlan, we will look into the TableSourceTable to assign watermark or not.
The steps will be as following:
- Parse DDL into SqlCreateTable with time attributes information
- Convert SqlCreateTable to CatalogTable in SqlToOperationConverter#convert
- time attribute information will be placed in CatalogTable#getTableSchema, called WatermarkSpec
- convert the time attribute information into properties.
- Pass the time attribute information (named TimeAttributeDescriptor) into TableSourceTable when DatabaseCalciteSchema#convertCatalogTable
- Create TableScan node (and computed column Project) and WatermarkAssigner node when FlinkRelOptTable#toRel
- Add a Rule to combine WatermarkAssigner and potential rowtime computed column project into TableScan. (DEFAULT_REWRITE optimization phase)
- Apply rowtime computed column if existed and watermark assigner according to the TimeAttributeDescriptor when StreamExecTableSourceScan#translateToPlan. Ignore TableSource’s DefinedRowtimeAttributes if TimeAttributeDescriptor exists.
The properties representention of watermark can not be the same with the properties of Rowtime descriptor. Because the watermark structure proposed by this design can't fully consume Rowtime descriptor properties.
So for a statement `WATERMARK FOR f1.q2 AS f1.q2 - INTERVAL '5' SECOND`, the proposed properties might be:
Code Block | ||
---|---|---|
| ||
'schema.watermark.0.rowtime' = 'f1.q2'
'schema.watermark.0.strategy.expression' = '`f1`.`q2` - INTERVAL '5' SECOND'
'schema.watermark.0.strategy.datatype' = 'TIMESTAMP(3)' |
The conversion between WatermarkSpec and properties is handled by framework, and users shouldn't care about it.
The connector developer can get the watermark information from the `CatalogTable#getTableSchema#getWatermarkSpecs` from `TableSourceFactory#createTableSource(ObjectPath tablePath, CatalogTable table)`.
Actually, the connector developers don't need to get watermark information for now, because the watermark assigner will be applied by framework automatically.
Deprecate DefinedRowtimeAttributes
Currently, in order to make the Rowtime descriptor work, the corresponding connector have to implement DefinedRowtimeAttributes interface, and handover the derived rowtime attribute to the interface. It is trivial and confused to users.
In terms of theory, TableSource shouldn’t care about the rowtime attribute and watermark information. TableSource should only care about the physical deserialization and network transmission. The planner should already know all the information about rowtime and watermark and can automatically apply watermark assigner after the source transformation.
We can deprecate DefinedRowtimeAttributes once we don't support temporal TableSource anymore.
...
Compatibility, Deprecation, and Migration Plan
...