Discussion thread | https://lists.apache.org/thread/d8toll8042k3m4z3pvn1pwg0n8twm0wt |
---|---|
Vote thread | TBD |
ISSUE | TBD |
Release | Paimon-0.5 |
Motivation
In data warehouse, it's a common requirement to query historical data. So we need to periodically merge incremental data with historical data to generate complete data of current time. For example, Hive 2.2 supports using MERGE INTO statement to merge source table's records into target table by performing update, insert and delete according to different filter, so a good way is taking historical table as target and incremental table as source, then using MERGE INTO to perform update (for existing records) and insert (for non-existing records). But when there exists a large amount of data, the merging may cost much time.
...
But in most scenarios, a table will generate too many snapshots, so Paimon lets users to configure snapshot expiration time to clean old, unused snapshots. Thus the snapshot of a specified time point may have expired when user want to query it. So we want to introduce savepoint for Paimon to persist full data a snapshot in a specified time point.
...
.
Proposed Changes
Storage
Like snapshot, a new directory `/savepoint` will be created under table directory for storing savepoints. The qualified path for a savepoint file is `/path/to/table/savepoint/savepoint-<id>`.
New Classes
We need a an entrance class `Savepoint` to store the information of a savepoint:
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
public class SavepointManager { /** Return the root Directory of savepoints. */ public Path savepointDirectory(); /** Return the path of a savepoint. */ public Path savepointPath(long savepointId); /** Get a savepoint instance. */ public Savepoint savepoint(long savepointId); /** Check if a savepoint exists. */ public boolean savepointExists(long savepointId); /** Return id of the earlist savepoint. */ public @Nullable Long earlistSavapointId(); /** Return id of the latest savepoint. */ public @Nullable Long latestSavepointId(); } |
We need a `SavepointsTable`, which can provide information of savepoints as system table `#savepoints`.
Usage
Periodically savepoints
Public Interfaces
Table Options
key | Requires | Type | note |
---|---|---|---|
savepoint.create-time | false | String | The time point to create savepoint periodically. Must be the format 'hh:mm'. |
savepoint.create-interval | false | Duration | Interval between two savepoints. At least 1d and must be intergers of days. |
Flink Actions
Now,
Call Procedures (Future work)
- Spark supports extension to extend SQL syntax. We can provide savepoint procedures after we support CALL statement for Spark.
- Supporting CALL is in the road map of Flink. We can provide savepoint procedures after then.
Proposed Changes
Storage
Like snapshot, a new directory `/savepoint` will be created under table directory for storing savepoints. The qualified path for a savepoint file is `/path/to/table/savepoint/savepoint-<id>`.
Creation & Deletion
Currently, we can provide two ways for user to control the creation and deletion of savepoints.
...
In the future, after Flink support stored procedure, we can provide procedures to managing savepoints.
Interaction with Snapshot
- When creating savepoint, we will do a full-compact and pick the compacted snapshot to save.
- When expiring snapshot, Paimon will check whether the data files are used by living snapshots. If not, they will be deleted. After we introduce savepoint, we should also check if the data files are used by savepoints.
- When deleting savepoint, we will check and delete unused datafiles (like we expire snapshot).
System Table
We suppose to introduce a system table `SavepointsTable`. The schema is:
Code Block | ||||
---|---|---|---|---|
| ||||
savepoint_id BIGINT schema_id BIGINT save_time BIGINT record_count BIGINT |
Usage
Time Travel to Savepoint (Batch Reading)
Currently, we support time traveling to a snapshot. Now we can support to travel to a savepoint.
...
For Flink, we can use dynamic option `scan.snapshot-id=<snapshot-id>`. We suppose to introduce a new core option `scan.savepoint-id` to support traveling to a savepoint.
Time Travel to Timestamp with Savepoint
Currently, we support time traveling to a time point. Paimon Scanner will find the snapshot of which commit time is closest to the specified time. The problem is all of the snapshots before the time point may have expired. So we suppose to try to find the closest savepoint in this scenario.
Compatibility, Deprecation, and Migration Plan
None.
Test Plan
UT tests: verify creating and reading savepoint
...
- time travel to savepoint
- expiration of snapshots won't delete data files pointed by savepoints
Rejected Alternatives
Support starting from a savepoint in streaming reading
Current design of savepoint just store full data manifests, so it's not able to support streaming reading now.