...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
public class Tag {
/** Used for identify a tag. */
private final long id;
/** Name of the tag. */
private final String name;
/** Creation time. */
private final long creationTimeMills;
/** Used to identify which snapshot does this tag mark. */
private final long taggedSnapshotId;
/** Id of the current table schema of the records in this tag. */
private final long schemaId;
/** The manifest list of all data of this tag. */
private final long fullManifestList;
/** How many records in this tag. */
private final long recordCount;
/** Getters. */
/** Some util methods. */
/** Return all {@link ManifestFileMeta} instances for full data manifests in this tag. */
public List<ManifestFileMeta> dataManifests(ManifestList manifestList);
/** Serialize to json. */
public String toJson();
/** Deserialize from json. */
public static Savepoint fromJson(String json);
/** Get a tag from path. */
public static Tag fromPath(FileIO fileIO, Path path);
} |
...
We propose to provide two Flink action for users to control the creation and deletion of tag.
action | argument | note |
---|---|---|
create-tag | --name <tag-name>: specify name of the tag. | create |
`create-savepoint` to create a savepoint at current time point;
`delete-savepoint --id <id>` to delete a specified savepoint.
...
a tag based on the latest snapshot. | ||
delete-tag | --name <tag-name>: specify which tag will be deleted. | delete a tag. |
System Table
We propose to propose to introduce a system table `SavepointsTable``$tags`. The schema is:
Code Block | ||||
---|---|---|---|---|
| ||||
savepointtag_name STRING, tag_id BIGINT, creation_time BIGINT, schematagged_snapshot_id BIGINT, saveschema_timeid BIGINT, record_count BIGINT |
Data Files Handling
When we create a tag, Paimon will store metadata of the data files used by tag instead of copying data files. So it's important to deal with the data files when we make changes of snapshots or tags.
Creating Tag
When creating tag, we merge the `baseManifestList` and `deltaManifestList` to full data and create manifest list for them. The manifest list will be stored in tag.
Deleting Tag
Expiring Snapshot
...
When
...
we delete a tag, all data files used by this tag are deletion candidates. How we determine a data file can or can not be deleted? we should take snapshots and tags into consideration.
For snapshots, We consider 3 scenarios:
- No snapshots. Do nothing to the candidates.
- Earliest snapshotId <= taggedSnapshotId: the snapshots in [earliest, taggedSnapshotId] may still use data files in deletion candidates. So we should check:
Full data files of earliest snapshot should be removed from candidates;
Delta data files of snapshots in (earliest, tagged snapshot] should be removed form candidates because they may be streaming read. - Earliest snapshotId > taggedSnapshotId: All the snapshots contains data files based on previous snapshot. So we can just only check the full data files of earliest snapshot (remove from candidates).
For tags, because each tag contains data files based on previous tag (just like snapshots), we can only check the full data files of neighbor tags if they exist (remove from candidates).
After we check the used data files, we can delete the rest in candidates.
Expiring Snapshot
We already had a mechanism to find deletion candidates when expiring snapshots, please see `SnapshotDeletion#doDeleteExpiredDataFiles`. Assumes we have gotten deletion candidates via existing method, let's discuss how to filter out data files used by tags.
We need 3 parameters to determine whether a data file should be removed from candidates:
- The snapshot id at which the data file is deleted (`deleteId`). This id can be gotten when we iterate the expiring snapshots.
- The snapshot id at which the data file is committed (`commitId`). To get this id, we should record it in `DataFileMeta`.
- The list of tagged snapshots ID (`taggedSnapshots`). This can be gotten from tag files in storage.
The core idea is that if a data file is committed and deleted within a tag, it can be deleted. For example, assumes we have `taggedSnapshots` as `[100, 200, 300]`, if a data file `A` is committed at snapshot 105 and deleted at snapshot 120, we can see `A` is not saved by any tag. When we expire snapshot 119, `A` will be put into candidates, and we find no tag use it, so `A` will be deleted finally; if `A` is deleted at snapshot 201, we can see `A` is saved by the second tag. When we expire snapshot 200, `A` will be put into candidates, but `A` will not be deleted because the second tag still uses it.
Future Work
Periodically Creation & Expiration
We can support periodically creating and expiring tags by providing these options:
Future Work
Table options
key | Requires | Type | note |
---|
tag.create-time | false | String | The time point to create |
tag periodically. Must be the format 'hh:mm'. |
tag.create-interval | false | Duration | Interval between two |
tags. At least 1d and must be |
integers of days. |
We can periodically create savepoints according to user configuration, A new core option will be introduced: `savepoint.create-interval` (Duration Type).
...
Tag Management Call Procedures
- Spark supports extension to extend SQL syntax. We can provide savepoint tag management procedures after we support CALL statement for Spark.
- Supporting CALL is in the road map of Flink. We can also provide savepoint procedures after thentag management procedures for Flink when possible.
Compatibility, Deprecation, and Migration Plan
...
Rejected Alternatives
Use name `Savepoint`
The savepoint is an existing concept in Flink and database. We think use this word may confuse users. The concept tag is used in iceberg, it is used to reference a snapshot. We think our scenario is similar to it, so we adopt the name `tag`.
Support starting from a tag in streaming reading
...