...
Code Block | ||
---|---|---|
| ||
@PublicEvolving public interface CatalogTable extends CatalogBaseTable { /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */ default Optional<Long> getSnapshot() { return Optional.empty(); } } |
Proposed Changes
I will describe the necessary modifications from several stages of SQL compilation and parsing.
1. Convert SQL String to SqlNode
During this stage, schema validation will be performed. Therefore, when calling CalciteSchema getTable, the time in SqlSnapshot can be parsed and converted to long, which is then passed to FlinkSchema. Finally, catalogManager.getTable(Object object, long timestamp) is called to obtain the specified timestamp.
2. SqlNode To RelNode.
During this stage, schema validation will also be performed. It is necessary to parse the SqlSnapshot and obtain the corresponding long value, which will be passed to FlinkSchema. Finally, catalogManager.getTable(Object, long timestamp) can obtain the table at the corresponding point in time.
3. Optimizer
Remove some validation logic. The early AS OF syntax was only applicable to temporal joins. Adding timetravel scenarios will remove some of the check items.
4. Convert RelNode to Transformation (Convert CommonExecTableSourceScan To Transformation)
When constructing a TableSourceScan, the corresponding DynamicTableSource will be created, which can obtain the corresponding CatalogTable and its Snapshot when creating the source.
Code Block |
---|
```
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
Optional<Long> snapshot = context.getCatalogTable().getSnapshot();
``` |
Compatibility, Deprecation, and Migration Plan
...