Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Describe the problems you are trying to solve.
Public Interfaces
Syntax
We propose add the following syntax for time travel statement.
-- Specify a specific time SELECT * FROM paimon_tb AS OF TIMESTAMP '2023-04-27 00:00:00' -- Specify a constant expression SELECT * FROM paimon_tb AS OF TIMESTAMP '2023-04-27 00:00:00' - INTERVAL '1' DAY
Public interfaces
Catalog
@PublicEvolving public interface Catalog { /** * Returns a {@link CatalogTable} or {@link CatalogView} identified by the given {@link * ObjectPath}. The framework will resolve the metadata objects when necessary. * * @param tablePath Path of the table or view * @param timestamp Timestamp of the table snapshot * @return The requested table or view * @throws TableNotExistException if the target does not exist * @throws CatalogException in case of any runtime exception */ default CatalogBaseTable getTable(ObjectPath tablePath, long timestamp) throws TableNotExistException, CatalogException { throw new UnsupportedOperationException( String.format("Table %s does not support time travel.", tablePath)); } }
CatalogTable
@PublicEvolving public interface CatalogTable extends CatalogBaseTable { /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */ default Optional<Long> getSnapshot() { return Optional.empty(); } }
Proposed Changes
I will describe the necessary modifications from several stages of SQL compilation and parsing.
1. Convert SQL String to SqlNode
During this stage, schema validation will be performed. Therefore, when calling CalciteSchema getTable, the time in SqlSnapshot can be parsed and converted to long, which is then passed to FlinkSchema. Finally, catalogManager.getTable(Object object, long timestamp) is called to obtain the specified timestamp.
2. SqlNode To RelNode.
During this stage, schema validation will also be performed. It is necessary to parse the SqlSnapshot and obtain the corresponding long value, which will be passed to FlinkSchema. Finally, catalogManager.getTable(Object, long timestamp) can obtain the table at the corresponding point in time.
3. Optimizer
Remove some validation logic. The early AS OF syntax was only applicable to temporal joins. Adding timetravel scenarios will remove some of the check items.
4. Convert RelNode to Transformation (Convert CommonExecTableSourceScan To Transformation)
When constructing a TableSourceScan, the corresponding DynamicTableSource will be created, which can obtain the corresponding CatalogTable and its Snapshot when creating the source.
``` @Override public DynamicTableSource createDynamicTableSource(Context context) { Optional<Long> snapshot = context.getCatalogTable().getSnapshot(); ```
Compatibility, Deprecation, and Migration Plan
Test Plan
UT&IT
Other Syntax of timetravel
Due to the current limitations of Calcite syntax, we can only support specifying a single time and cannot specify a time range.
SqlSnapshot Snapshot(SqlNode tableRef) : { final Span s; final SqlNode e; } { { s = span(); } <FOR> <SYSTEM_TIME> <AS> <OF> // Syntax for temporal table in // standard SQL 2011 IWD 9075-2:201?(E) 7.6 <table reference> // supports grammar as following: // 1. datetime literal // 2. datetime value function, i.e. CURRENT_TIMESTAMP // 3. datetime term in 1 or 2 +(or -) interval term // We extend to support column reference, use Expression // to simplify the parsing code. e = Expression(ExprContext.ACCEPT_NON_QUERY) { return new SqlSnapshot(s.end(this), tableRef, e); } }
Currently , we can't support the following synax.
SELECT ENo,EName,Sys_Start,Sys_End FROM Emp FOR SYSTEM_TIME FROM TIMESTAMP '2011-01-02 00:00:00’TO TIMESTAMP '2011-12-31 00:00:00'
OR
SELECT ENo,EName,Sys_Start,Sys_End FROM Emp FOR SYSTEM_TIME BETWEEN TIMESTAMP '2011-01-02 00:00:00'AND TIMESTAMP '2011-12-31 00:00:00'
Rejected Alternatives
Add SupportsTimeTravel source ability
Add a new source ability interface. Different connectors can implement this interfaces to support time travel.
1. Add the Source Ability interface to support TimeTravel systems in implementing TimeTravel capabilities.
2. Add corresponding table optimization rules so that during LogicalPlan optimization, the Snapshot interval can be pushed up to the ScanTableSource.
3. To avoid conflicts with Temporal Join, we should apply TimeTravel transformation rules before apply the Temporal Join transformation rules.
/** Enables to push down the timestamp of snapshot into a {@link ScanTableSource}. * * <p>When a source connector implements the SupportsTimeTravel interface, users can leverage its * time-travel functionality to analyze historical data, perform backfilling operations, and more. * This can be particularly useful for debugging and data recovery purposes. */ @PublicEvolving public interface SupportsTimeTravel { /** * @param timestamp The timestamp of the snapshot to apply. */ void applySnapshot(long timestamp); }