Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Fault recovery (or we can say disaster recovery). Users can ROLL BACK to a tag if needed. If user rollbacks to a tag, the table will hold the data in the tag and the data committed  after the tag will be deleted. In this scenario the tag is like backup, but more lightweight because the tag only record the metadata of data files instead of copying them.

  2. Record versions of data at a longer interval (typically daily or weekly). With tag, user can query the old data in batch mode. In this scenario the tag is like long-lived snapshot.

Proposed Changes

We will introduce new usage for users and new internal changes of codes and file storage to support tag.  

Usage with Tag

Currently, we support time traveling to a snapshot. Now we propose to support traveling to a tag. 

There are two ways for time travel:

  1. Specifying a version
    Currently, Paimon support using the snapshot id as target version of time travel. For example, in Spark, we can travel to snapshot 1 by SQL `SELECT * FROM t VERSION AS OF 1`. We propose to support using `tag-name` as version string. 
  2. Specifying a time point
    Currently, Paimon support traveling to a time point. Paimon Scanner will find the snapshot of which commit time is closest to given time point. We propose to support that if all of the snapshots before the time point have expired, try to find the closest tag.

Otherwise, we propose to expose creating and deleting operation of tag to users:

  1. Creating tag: let user to create a tag based on the current snapshot and name the tag.
  2. Deleting tag: let user to delete a tag by it's name.

Storage

Like snapshot, a new directory `/tag` will be created under table directory for storing tags. The qualified path for a tag file is `/path/to/table/tag/tag-<id>`, and the id is increased from 1.

...

Code Block
languagejava
firstline1
titleSavepoint
linenumberstrue
public class Tag {
	/** Used for identify a tag. */
	private final long id;

	/** Name of the tag. */
	private final String name;

	/** Used to identify which snapshot does this tag mark. */
    private final long taggedSnapshotId;

   	/** Id of the current table schema of the records in this tag. */
	private final long schemaId;

	/** The manifest list of all data of this tag. */
	private final long fullManifestList;

	/** How many records in this tag. */
	private final long recordCount;

	/** Getters. */
	
	/** Some util methods. */

	/** Return all {@link ManifestFileMeta} instances for full data manifests in this tag. */
 	public List<ManifestFileMeta> dataManifests(ManifestList manifestList);
	
	/** Serialize to json. */
	public String toJson();

	/** Deserialize from json. */
	public static Savepoint fromJson(String json);

	/** Get a tag from path. */
	public static Tag fromPath(FileIO fileIO, Path path);
}

...

Code Block
languagejava
firstline1
titleSavepointManager
linenumberstrue
public class TagManager {
	/** Return the root Directory of tags. */
	public Path tagDirectory(); 
 	
	/** Return the path of a tag. */
	public Path tagPath(long tagId);

 	/** Return the tag id by name. */
	public @Nullable Long findIdByName(String tagName);	 

 	/** Get a tag instance by id. */
	public Tag tag(long tagId);

 	/** Check if a tag exists. */
	public boolean tagExists(long tagId);
 	
	/** Return id of the earliest tag. */
 	public @Nullable Long earliestTagId();

 	/** Return id of the latest tag. */
    public @Nullable Long latestTagId();
}

...

We need a `TagsTable`, which can provide information of tags as system table `#tags``<table>$tags`.

The schema of TagsTable is in section `Public Interfaces`.

Time Travel to Tags Support

Public Interfaces

SQL Syntax of Time Travel

...

(only for batch read)

Spark

SELECT

Currently, we support time traveling to a snapshot. Now we can support to travel to a savepoint.

For spark, we can use SQL `SELECT * FROM t VERSION AS OF <versiontag-string>`. Currently, the version string only supports a long value which represents snapshot. Now we suppose to use `snapshot.<id>` to specify traveling to snapshot or use `savepoint.<id>` to specify traveling to savepoint. The old style `<id>` will be considered as snapshot id. 

For Flink, we can use dynamic option `scan.snapshot-id=<snapshot-id>`. We suppose to introduce a new core option `scan.savepoint-id` to support traveling to a savepoint.

Time Travel to Timestamp with Savepoint

Currently, we support time traveling to a time point. Paimon Scanner will find the snapshot of which commit time is closest to the specified time. The problem is all of the snapshots before the time point may have expired. So we suppose to try to find the closest savepoint in this scenario.

Public Interfaces

Table Options 

...

savepoint.create-time

...

name.<name>

SELECT * FROM t VERSION AS OF tag-id.<id>

SELECT * FROM t /*+ OPTIONS('scan.tag-name'='<name>') */

SELECT * FROM t /*+ OPTIONS('scan.tag-id'='<id>') */

Flink Actions

We propose to provide two Flink action for users

SQL Syntax

Flink Actions

Currently, we can provide two ways for user to control the creation and deletion of savepointstag.

...

actionargumentnote
create-tag--name <tag-name>: specify name of the tag.create a 



`create-savepoint` to create a savepoint at current time point;
`delete-savepoint --id <id>` to delete a specified savepoint.

In the future, after Flink support stored procedure, we can provide procedures to managing savepoints.

System Table

We suppose to propose to introduce a system table `SavepointsTable`. The schema is:

...

  1. When creating savepoint, we will do a full-compact and pick the compacted snapshot to save.
  2. When expiring snapshot, Paimon will check whether the data files are used by living snapshots. If not, they will be deleted. After we introduce savepoint, we should also check if the data files are used by savepoints. 
  3. When deleting savepoint, we will check and delete unused datafiles (like we expire snapshot).

Future Work

Table options

keyRequiresTypenote

savepoint.create-time

falseStringThe time point to create savepoint periodically. Must be the format 'hh:mm'.
savepoint.create-intervalfalseDurationInterval between two savepoints. At least 1d and must be intergers of days.

We can periodically create savepoints according to user configuration, A new core option will be introduced: `savepoint.create-interval` (Duration Type).

Call Procedures (Future work)

...