Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
firstline1
titleSavepoint
linenumberstrue
public class Tag {
	/** Used for identify a tag. */
	private final long id;

	/** Name of the tag. */
	private final String name;

	/** Creation time. */
	private final long creationTimeMills;

	/** Used to identify which snapshot does this tag mark. */
    private final long taggedSnapshotId;

   	/** Id of the current table schema of the records in this tag. */
	private final long schemaId;

	/** The manifest list of all data of this tag. */
	private final long fullManifestList;

	/** How many records in this tag. */
	private final long recordCount;

	/** Getters. */
	
	/** Some util methods. */

	/** Return all {@link ManifestFileMeta} instances for full data manifests in this tag. */
 	public List<ManifestFileMeta> dataManifests(ManifestList manifestList);
	
	/** Serialize to json. */
	public String toJson();

	/** Deserialize from json. */
	public static Savepoint fromJson(String json);

	/** Get a tag from path. */
	public static Tag fromPath(FileIO fileIO, Path path);
}

...

We propose to provide two Flink action for users to control the creation and deletion of tag.

actionargumentnote
create-tag--name <tag-name>: specify name of the tag.create

`create-savepoint` to create a savepoint at current time point;
`delete-savepoint --id <id>` to delete a specified savepoint.

...

a tag based on the latest snapshot. 
delete-tag--name <tag-name>: specify which tag will be deleted.delete a tag.

System Table

We propose to propose to introduce a system table `SavepointsTable``$tags`. The schema is:

Code Block
languagesql
firstline1
savepointtag_name STRING,
tag_id BIGINT,
creation_time BIGINT,
schematagged_snapshot_id BIGINT,
saveschema_timeid BIGINT,
record_count BIGINT 

Data Files Handling

When we create a tag, Paimon will store metadata of the data files used by tag instead of copying data files. So it's important to deal with the data files when we make changes of snapshots or tags.

Creating Tag

When creating tag, we merge the `baseManifestList` and `deltaManifestList` to full data and create manifest list for them. The manifest list will be stored in tag.

Deleting Tag

Expiring Snapshot

...

When

...

we delete a tag, all data files used by this tag are deletion candidates. How we determine a data file can or can not be deleted? we should take snapshots and tags into consideration. 

For snapshots, We consider 3 scenarios:

  1. No snapshots. Do nothing to the candidates.
  2. Earliest snapshotId <= taggedSnapshotId: the snapshots in [earliest, taggedSnapshotId] may still use data files in deletion candidates. So we should check:
    Full data files of earliest snapshot should be removed from candidates;
    Delta data files of snapshots in (earliest, tagged snapshot] should be removed form candidates because they may be streaming read.
  3. Earliest snapshotId > taggedSnapshotId: All the snapshots contains data files based on previous snapshot. So we can just only check the full data files of earliest snapshot (remove from candidates).

For tags, because each tag contains data files based on previous tag (just like snapshots), we can only check the full data files of neighbor tags if they exist (remove from candidates).

After we check the used data files, we can delete the rest in candidates.

Expiring Snapshot

We already had a mechanism to find deletion candidates when expiring snapshots, please see `SnapshotDeletion#doDeleteExpiredDataFiles`. Assumes we have gotten deletion candidates via existing method, let's discuss how to filter out data files used by tags.

We need 3 parameters to determine whether a data file should be removed from candidates:

  1. The snapshot id at which the data file is deleted (`deleteId`). This id can be gotten when we iterate the expiring snapshots.
  2. The snapshot id at which the data file is committed (`commitId`). To get this id, we should record it in `DataFileMeta`. 
  3. The list of tagged snapshots ID (`taggedSnapshots`). This can be gotten from tag files in storage.

The core idea is that if a data file is committed and deleted within a tag, it can be deleted. For example, assumes we have `taggedSnapshots` as `[100, 200, 300]`, if a data file `A` is committed at snapshot 105 and deleted at snapshot 120, we can see `A` is not saved by any tag. When we expire snapshot 119, `A` will be put into candidates, and we find no tag use it, so `A` will be deleted finally; if `A` is deleted at snapshot 201, we can see `A` is saved by the second tag. When we expire snapshot 200, `A` will be put into candidates, but `A` will not be deleted because the second tag still uses it.

Future Work

Periodically Creation & Expiration

We can support periodically creating and expiring tags by providing these options:

Future Work

Table options

keyRequiresTypenote
savepoint

tag.create-time

falseStringThe time point to create
savepoint
tag periodically. Must be the format 'hh:mm'.
savepoint
tag.create-intervalfalseDurationInterval between two
savepoints
tags. At least 1d and must be
intergers
integers of days.

We can periodically create savepoints according to user configuration, A new core option will be introduced: `savepoint.create-interval` (Duration Type).

...

Tag Management Call Procedures 

  1. Spark supports extension to extend SQL syntax. We can provide savepoint tag management procedures after we support CALL statement for Spark.
  2. Supporting CALL is in the road map of Flink. We can also provide savepoint procedures after thentag management procedures for Flink when possible

Compatibility, Deprecation, and Migration Plan

...

Rejected Alternatives

Use name `Savepoint`

The savepoint is an existing concept in Flink and database. We think use this word may confuse users. The concept tag is used in iceberg, it is used to reference a snapshot. We think our scenario is similar to it, so we adopt the name `tag`.

Support starting from a tag in streaming reading

...