Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
@PublicEvolving
public interface CatalogTable extends CatalogBaseTable {
    /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */
    default Optional<Long> getSnapshot() {
        return Optional.empty();
    }
}



Proposed Changes

I will describe the necessary modifications from several stages of SQL compilation and parsing.

1.  Convert SQL String to SqlNode

During this stage, schema validation will be performed. Therefore, when calling CalciteSchema getTable, the time in SqlSnapshot can be parsed and converted to long, which is then passed to FlinkSchema. Finally, catalogManager.getTable(Object object, long timestamp) is called to obtain the specified timestamp.

2.  SqlNode To RelNode.

During this stage, schema validation will also be performed. It is necessary to parse the SqlSnapshot and obtain the corresponding long value, which will be passed to FlinkSchema. Finally, catalogManager.getTable(Object, long timestamp) can obtain the table at the corresponding point in time.

3.  Optimizer

Remove some validation logic. The early AS OF syntax was only applicable to temporal joins. Adding timetravel scenarios will remove some of the check items.

4.  Convert RelNode to Transformation (Convert CommonExecTableSourceScan To Transformation)

When constructing a TableSourceScan, the corresponding DynamicTableSource will be created, which can obtain the corresponding CatalogTable and its Snapshot when creating the source.

Code Block
```
@Override  
public DynamicTableSource createDynamicTableSource(Context context) {   
  
Optional<Long> snapshot = context.getCatalogTable().getSnapshot();
```




Compatibility, Deprecation, and Migration Plan

...