Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation



Motivation

Time travel is a technique for querying historical versions of data that allows users to specify a point in time and retrieve the data and schema of a table as it appeared at that time. With time travel, users can easily analyze and compare historical versions of dataDescribe the problems you are trying to solve.

Public Interfaces

Syntax

We propose add the following syntax for time travel statement. 

...

1.  Convert SQL String to SqlNode and Validate

During At this stage, the schema validation of the table will be performed. Therefore, before calling the method getTable  , The period  of SqlSnapshot should be parsed and converted to a long timestamp, We can generate a LongSchemaVersion  and generate a  SchemaSnapshot for the current Schema. Finally, catalogManager.getTable(Object object, long timestamp) is called to get the table for the corresponding time.verified, so when parsing the SqlSnapshot node, the corresponding `peroid` (maybe a time constant or an expression) can be converted into a timestamp, and a LongSchemaVersion is constructed to generate a new The calcite schema for schema validate. 

2.  SqlNode To RelNode.

During At this stage, the SQL schema validation will also be performed. It is necessary to parse the SqlSnapshot and convert the peroid to a long timestamp, then we should generate a LongSchemaVersion and generate a Schema Snapshot. Finally, catalogManager.getTable(Object, long timestamp) can obtain the table at the corresponding point in timeverified, so when converting the SqlSnapshot node, the timestamp of the obtained SqlSnapshot can be parsed, and the LongSchemaVersion will be constructed to generate a new Calcite Schema for table validation.

3.  OptimizerOptimizer 

At present, Flink SQL only supports temporol join, so related detection items are added to avoid the use of unsupported syntax. The current implementation of time travel should remove these restrictionsRemove some validation logic. The early AS OF syntax was only applicable to temporal joins. Adding time travel  support should remove 

4.  Convert RelNode to Transformation (Convert CommonExecTableSourceScan To Transformation) 

When constructing a TableSourceScanIf a connector needs to support TimeTravel, the corresponding DynamicTableSource will be created, which can obtain the corresponding CatalogTable and its Snapshot when creating the source.timestamp can be obtained through the getSnapshot  interface of CatlogTable when constructing a DynamicTableSource

Code Block
```
@Override  
public DynamicTableSource createDynamicTableSource(Context context) {   
  
Optional<Long> snapshot = context.getCatalogTable().getSnapshot();
```

...

Compatibility, Deprecation, and Migration Plan

This is a newly added feature, so there will be no compatibility issues


Test Plan

UT&IT

...


Whether to support other syntax implementations

Due to the current limitations of Calcite syntax, we can only support specifying a single time and cannot specify a time range.

...

Code Block
languagejava
/**
 Enables to push down the timestamp of snapshot into a {@link ScanTableSource}.
 *
 * <p>When a source connector implements the SupportsTimeTravel interface, users can leverage its
 * time-travel functionality to analyze historical data, perform backfilling operations, and more.
 * This can be particularly useful for debugging and data recovery purposes.
 */

@PublicEvolving
public interface SupportsTimeTravel {

    /**
     * @param timestamp The timestamp of the snapshot to apply.
     */
    void applySnapshot(long timestamp);
}



1.