...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
public class SavepointManager { /** Return the root Directory of savepoints. */ public Path savepointDirectory(); /** Return the path of a savepoint. */ public Path savepointPath(long savepointId); /** Get a savepoint instance. */ public Savepoint savepoint(long savepointId); /** Check if a savepoint exists. */ public boolean savepointExists(long savepointId); /** Return id of the earlist savepoint. */ public @Nullable Long earlistSavapointId(); /** Return id of the latest savepoint. */ public @Nullable Long latestSavepointId(); } |
Proposed Changes
Storage
Like snapshot, a new directory `/savepoint` will be created under table directory for storing savepoints. The qualified path for a savepoint file is `/path/to/table/savepoint/savepoint-<id>`.
Creation & Deletion
Currently, we can provide two ways for user to control the creation and deletion of savepoints.
- Table options
We can periodically create savepoints according to user configuration, A new core option will be introduced: `savepoint.create-interval` (Duration Type). - Flink action
In case that user wants to manually create or delete a savepoint, we propose to introduce tow actions:
`create-savepoint` to create a savepoint at current time point;
`delete-savepoint --id <id>` to delete a specified savepoint.
In the future, after Flink support stored procedure, we can provide procedures to managing savepoints.
System Table
We suppose to introduce a system table `SavepointsTable`. The schema is:
Code Block | ||||
---|---|---|---|---|
| ||||
savepoint_id BIGINT
saved_snapshot_id BIGINT
schema_id BIGINT
save_time BIGINT
record_count BIGINT |
Usage
We'd like to explain more details about where savepoints take effect.
Spark Time Travel to Version
The spark SQL is `SELECT * FROM t VERSION AS OF <version-string>`.
Currently, the version string only supports a long value which represents snapshot. Now we suppose to use `snapshot.<id>` to specify traveling to snapshot or use `savepoint.<id>` to specify traveling to savepoint.
Spark Time Travel to Timestamp
The spark SQL is `SELECT * FROM t TIMESTAMP AS OF <timestamp-string>`
Currently, we will find the snapshot of which commit time is closest to the specified time point. The problem is the snapshots before the time point may have expired. Now we suppose to support try to find the closest savepoint after Paimon Scanner finds no snapshot.
Flink Streaming Reading from Snapshot/Savepoint
Currently, we support to specify start snapshot in streaming reading by dynamic option `scan.snapshot-id=<snapshot-id>`. In case that
Flink Streaming Reading from timestamp
- Spark time travel to version
- Spark time travel to timestamp
- Flink streaming read from