You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 15 Next »


Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Release<Flink Version>


Motivation

Time travel is a technique for querying historical versions of data that allows users to specify a point in time and retrieve the data and schema of a table as it appeared at that time. With time travel, users can easily analyze and compare historical versions of data.

Public Interfaces

Syntax

We propose add the following syntax for time travel statement.   In the SQL 2011 standard[1], syntax related to Time Travel is defined.

-- Specify a specific time
SELECT * FROM paimon_tb AS OF TIMESTAMP '2023-04-27 00:00:00'

-- Specify a constant expression
SELECT * FROM paimon_tb AS OF TIMESTAMP '2023-04-27 00:00:00' - INTERVAL '1' DAY


Public interfaces

Catalog

@PublicEvolving
public interface Catalog {

   /**
     * Returns a {@link CatalogTable} or {@link CatalogView} identified by the given {@link
     * ObjectPath}. The framework will resolve the metadata objects when necessary.
     *
     * @param tablePath Path of the table or view
     * @param timestamp Timestamp of the table snapshot
     * @return The requested table or view
     * @throws TableNotExistException if the target does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default CatalogBaseTable getTable(ObjectPath tablePath, long timestamp)
            throws TableNotExistException, CatalogException {
        throw new UnsupportedOperationException(
                String.format("Table %s does not support time travel.", tablePath));
    }

}


CatalogTable

@PublicEvolving
public interface CatalogTable extends CatalogBaseTable {
    /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */
    default Optional<Long> getSnapshot() {
        return Optional.empty();
    }
}



Proposed Changes

I will describe the necessary modifications from several stages of SQL compilation and parsing.

1.  Convert SQL String to SqlNode and Validate

At this stage, the schema of the table will be verified, so when parsing the SqlSnapshot node, the corresponding `peroid` (maybe a time constant or an expression) can be converted into a timestamp, and a LongSchemaVersion is constructed to generate a new The calcite schema for schema validate. 

2.  SqlNode To RelNode.

At this stage, the SQL schema will also be verified, so when converting the SqlSnapshot node, the timestamp of the obtained SqlSnapshot can be parsed, and the LongSchemaVersion will be constructed to generate a new Calcite Schema for table validation.

3.  Optimizer 

At present, Flink SQL only supports temporol join, so related detection items are added to avoid the use of unsupported syntax. The current implementation of time travel should remove these restrictions

4.  Convert RelNode to Transformation (Convert CommonExecTableSourceScan To Transformation) 

If a connector needs to support TimeTravel, the corresponding timestamp can be obtained through the getSnapshot  interface of CatlogTable when constructing a DynamicTableSource

```
@Override  
public DynamicTableSource createDynamicTableSource(Context context) {   
  
Optional<Long> snapshot = context.getCatalogTable().getSnapshot();
```


Compatibility, Deprecation, and Migration Plan

This is a newly added feature, so there will be no compatibility issues


Test Plan

UT&IT


Whether to support other syntax implementations

Due to the current limitations of Calcite syntax, we can only support specifying a single time and cannot specify a time range.

SqlSnapshot Snapshot(SqlNode tableRef) :
{
    final Span s;
    final SqlNode e;
}
{
    { s = span(); } <FOR> <SYSTEM_TIME> <AS> <OF>
    // Syntax for temporal table in
    // standard SQL 2011 IWD 9075-2:201?(E) 7.6 <table reference>
    // supports grammar as following:
    // 1. datetime literal
    // 2. datetime value function, i.e. CURRENT_TIMESTAMP
    // 3. datetime term in 1 or 2 +(or -) interval term

    // We extend to support column reference, use Expression
    // to simplify the parsing code.
    e = Expression(ExprContext.ACCEPT_NON_QUERY) {
        return new SqlSnapshot(s.end(this), tableRef, e);
    }
}


Currently , we can't support the following synax.

SELECT ENo,EName,Sys_Start,Sys_End
FROM Emp FOR SYSTEM_TIME FROM
 TIMESTAMP '2011-01-02 00:00:00’TO
 TIMESTAMP '2011-12-31 00:00:00' 

OR 

SELECT ENo,EName,Sys_Start,Sys_End
FROM Emp FOR SYSTEM_TIME BETWEEN
 TIMESTAMP '2011-01-02 00:00:00'AND
 TIMESTAMP '2011-12-31 00:00:00' 



Rejected Alternatives

Add SupportsTimeTravel source ability  

Add a new source ability interface. Different connectors can implement this interfaces to support time travel.

1. Add the Source Ability interface to support TimeTravel systems in implementing TimeTravel capabilities.
2. Add corresponding table optimization rules so that during LogicalPlan optimization, the Snapshot interval can be pushed up to the ScanTableSource.
3. To avoid conflicts with Temporal Join, we should apply TimeTravel transformation rules before apply the Temporal Join transformation rules.

/**
 Enables to push down the timestamp of snapshot into a {@link ScanTableSource}.
 *
 * <p>When a source connector implements the SupportsTimeTravel interface, users can leverage its
 * time-travel functionality to analyze historical data, perform backfilling operations, and more.
 * This can be particularly useful for debugging and data recovery purposes.
 */

@PublicEvolving
public interface SupportsTimeTravel {

    /**
     * @param timestamp The timestamp of the snapshot to apply.
     */
    void applySnapshot(long timestamp);
}


1. https://sigmodrecord.org/publications/sigmodRecord/1209/pdfs/07.industry.kulkarni.pdf

  • No labels