Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14320

Release1.10

Status

Current state"Under Discussion"

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-66-Support-time-attribute-in-SQL-DDL-tt32766.html

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Note that, we will only support this in blink planner. In old flink planner, we will simply throw unsupported exception.

Currently, we already support  Rowtime descriptor end-to-end. That is, the Rowtime descriptor will be converted into properties with table schema and other information to store in catalog. If the registered table is queried, then connector TableFactory can derive the rowtime attribute descriptor from properties and set it into the created TableSource when createTableSource, see KafkaTableSourceSinkFactoryBase. At the end phase of optimization, i.e. translateToPlan, the planner will apply a watermark assigner transformation after the source transformation according to the RowtimeAttributeDescriptor, see StreamExecTableSourceScan.

The following figure shows how this happens. 

Image Removed

So the only thing we need to do, is bridging watermark DDL definition to the same property keys and values as Rowtime descriptor does. Then it should work. 

For example, 

Code Block
languagesql
ts TIMESTAMP(3), 
WATERMARK FOR ts AS ts - '5' SECOND

The core idea is that the framework could extract the rowtime attribute and watermark information from the properties and carry the information in TableSourceTable instead of in TableSource. When translateToPlan, we will look into the TableSourceTable to assign watermark or not. 

The steps will be as following:

  1. Parse DDL into SqlCreateTable with time attributes information
  2. Convert SqlCreateTable to CatalogTable in SqlToOperationConverter#convert
    1. time attribute information will be placed in CatalogTable#getTableSchema, called WatermarkSpec
    2. convert the time attribute information into properties.
  3. Pass the time attribute information (named TimeAttributeDescriptor) into TableSourceTable when DatabaseCalciteSchema#convertCatalogTable
  4. Create TableScan node (and computed column Project) and WatermarkAssigner node when FlinkRelOptTable#toRel
  5. Add a Rule to combine WatermarkAssigner and potential rowtime computed column project into TableScan. (DEFAULT_REWRITE optimization phase)
  6. Apply rowtime computed column if existed and watermark assigner according to the TimeAttributeDescriptor when StreamExecTableSourceScan#translateToPlan. Ignore TableSource’s DefinedRowtimeAttributes if TimeAttributeDescriptor exists. 


The properties representention of watermark can not be the same with the properties of Rowtime descriptor. Because the watermark structure proposed by this design can't fully consume Rowtime descriptor properties.

So for a statement `WATERMARK FOR f1.q2 AS f1.q2 - INTERVAL '5' SECOND`, the proposed properties might be:It will be converted into following properties.

Code Block
languagesql
'schema.watermark.0.rowtime.timestamps.type' = 'from-fieldf1.q2'
'schema.watermark.0.rowtimestrategy.timestamps.fromexpression' = '`f1`.`q2` - INTERVAL '5'ts SECOND'
'schema.watermark.0.rowtimestrategy.watermarks.typedatatype' = 'periodic-bounded'
'schema.0.rowtime.watermarks.delay' = '5000'TIMESTAMP(3)'

The conversion between WatermarkSpec and properties is handled by framework, and users shouldn't care about it. 

The connector developer can get the watermark information from the `CatalogTable#getTableSchema#getWatermarkSpecs` from `TableSourceFactory#createTableSource(ObjectPath tablePath, CatalogTable table)`.

Actually, the connector developers don't need to get watermark information for now, because the watermark assigner will be applied by framework automatically.

Deprecate DefinedRowtimeAttributes

...

We can deprecate DefinedRowtimeAttributes , and automatically apply watermark assigner by planner to avoid doing the same logic in every connector. The same to DefinedProctimeAttribute.

The core idea is that the framework could extract the rowtime attribute and watermark information from the properties and carry the information in TableSourceTable instead of in TableSource. When translateToPlan, we will look into the TableSourceTable to assign watermark or not. 

The steps will be as following:

...

  1. time attribute information will be placed in CatalogTable#getTableSchema
  2. convert the time attribute information into the same properties as descriptors.

...

once we don't support temporal TableSource anymore.

Compatibility, Deprecation, and Migration Plan

...