Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Motivation

In data warehouse, it's a common requirement to query historical data. So we need to periodically merge incremental data with historical data to generate complete data of current time. For example, Hive 2.2 supports using MERGE INTO statement to merge source table's records into target table by performing update, insert and delete according to different filter, so a good way is taking historical table as target and incremental table as source, then using MERGE INTO to perform update (for existing records) and insert (for non-existing records). But when there exists a large amount of data, the merging may cost much time.

...

But in most scenarios, a table will generate too many snapshots, so Paimon lets users to configure snapshot expiration time to clean old, unused snapshots. Thus the snapshot of a specified time point may have expired when user want to query it. So we want to introduce savepoint for Paimon to persist a snapshot in a specified time point.

...

To solve this problem, we propose to introduce a new mechanism `Tag`. A tag is created from snapshot and can keep longer. The purposes of tag are:

  1. Fault recovery (or we can say disaster recovery). Users can ROLL BACK to a tag if needed. If user rollbacks to a tag, the table will hold the data in the tag and the data committed  after the tag will be deleted. In this scenario the tag is like backup, but more lightweight because the tag only record the metadata of data files instead of copying them.

  2. Record versions of data at a longer interval (typically daily or weekly). With tag, user can query the old data in batch mode. In this scenario the tag is like long-lived snapshot.

Proposed Changes

We will introduce new usage for users and new internal changes of codes and file storage to support tag.  

Usage with Tag

Currently, we support time traveling to a snapshot. Now we propose to support traveling to a tag. 

There are two ways for time travel:

  1. Specifying a version
    Currently, Paimon support using the snapshot id as target version of time travel. For example, in Spark, we can travel to snapshot 1 by SQL `SELECT * FROM t VERSION AS OF 1`. We propose to support using `tag-name` as version string. 
  2. Specifying a time point
    Currently, Paimon support traveling to a time point. Paimon Scanner will find the snapshot of which commit time is closest to given time point. We propose to support that if all of the snapshots before the time point have expired, try to find the closest tag.

Otherwise, we propose to expose creating and deleting operation of tag to users:

  1. Creating tag: let user to create a tag based on the current snapshot and name the tag.
  2. Deleting tag: let user to delete a tag by it's name.

Storage

Like snapshot, a new directory `/savepoint` tag` will be created under table directory for storing savepointstags. The qualified path for a savepoint tag file is `/path/to/table/savepointtag/savepoint-<id>`.

New Classes

<tag-name>`, and the tag name is specified by user. 

New Classes

It's not necessary to introduce a new `Tag` class because the tag is very similar to snapshot, we can just reuse the `Snapshot`.  When we create a tag from a snapshot, we can just copy the corresponding snapshot file to the tag directory with tag name; when we read a tag, we can deserialize the tag file to a snapshot.

We need a `TagManager` to manage the tags (similar to `SnapshotManager`). We need an entrance class `Savepoint` to store the information of a savepoint:

Code Block
languagejava
firstline1
titleSavepointTagManager
linenumberstrue
public class SavepointTagManager {
	/** Used for identify a savepoint. */
	private final long id;

	/** Id of the current table schema of the records in this savepointReturn the root Directory of tags. */
	privatepublic final long schemaId;

	/** The manifest list of all data of this savepoint. */
	private final long fullManifestList;

Path tagDirectory(); 
 	
	/** HowReturn manythe recordspath inof thisa savepointtag. */
	privatepublic finalPath long recordCounttagPath(String tagName);

	/** Getters. */
	
	/** Some util methods. */

	/** Return all {@link ManifestFileMeta} instances for full data manifests in this snapshotCreate a tag from given snapshot and save it in the storage. */
 	public List<ManifestFileMeta>void dataManifests(ManifestList manifestListcommitTag(Snapshot snapshot, String tagName);
	
	/** Expire Serializea totag json. */
	public String toJson();

	/** Deserialize from jsonand clean unused files in the storage. */
	public staticvoid Savepoint fromJsonexpireTag(String jsontagName);	

	/** GetCheck if a savepointtag from pathexists. */
	public staticboolean Savepoint fromPathtagExists(FileIO fileIO, Path path);
}

We need a `SavepointManager` to manage the savepoints (similar to `SnapshotManager`).

Code Block
languagejava
firstline1
titleSavepointManager
linenumberstrue
public class SavepointManager {String tagName);    

	/** ReturnGet the root Directory of savepointstagged snapshot. */
	public PathSnapshot savepointDirectorysnapshot(String tagName);
 	
	/** Get Returnall thetagged pathsnapshots ofin aan savepointiterator. */
	public PathIterator<Snapshot> savepointPathtaggedSnapshots(long savepointId);

 	/** Get a savepoint instance. */
	public Savepoint savepoint(long savepointId);

 	/** CheckGet ifprevious atag savepointof exists. */
	public boolean savepointExists(long savepointId);
 	
	/** Return id of the earlist savepointwhich commit time is earlier. */
 	public @Nullable LongString earlistSavapointIdprevious(String tagName);

 	/** Get Returnnext idtag of which commit thetime latestis savepointlater. */
    	public @Nullable LongString latestSavepointIdnext(String tagName);
}


We need a `SavepointsTable``TagsTable`, which can provide information of savepoints tags as system table `#savepoints``<table>$tags`.

Usage

Periodically savepoints

Public Interfaces

Table Options 

...

savepoint.create-time

...

The schema of TagsTable is in section `Public Interfaces`.

DataFileMeta Modification and Compatibility

For the convenience of deleting unused data files when expiring snapshots (see `DataFiles Handling → Expiring Snapshot`), we propose to add a new field `long commitSnapshot` to `DataFileMeta`.

Compatibility

DataFileMeta Ser/De: We will upgrade `ManifestEntrySerializer` to version 3.  In version 3, if the ManifestEntrySerializer receives version 2 InternalRow, the commitSnapshot will be set to -1. 

Expiring snapshots: If we find the commitSnapshot is -1, we fall back to trivial method (walk through all data files of all tags to check whether the data file is used or not).

Public Interfaces

SQL Syntax of Time Travel (only for batch read)

Spark

SELECT * FROM t VERSION AS OF tag-name

Note: The tag name can not be an numeric string to make a difference from snapshot. This limitation will be checked when creating tag and documented.

SELECT * FROM t /*+ OPTIONS('scan.tag-name'='<name>') */

Flink Actions

We propose to provide two Flink action for users

Flink Actions

Now, 

Call Procedures (Future work)

  1. Spark supports extension to extend SQL syntax. We can provide savepoint procedures after we support CALL statement for Spark.
  2. Supporting CALL is in the road map of Flink. We can provide savepoint procedures after then. 

Proposed Changes

Storage

Like snapshot, a new directory `/savepoint` will be created under table directory for storing savepoints. The qualified path for a savepoint file is `/path/to/table/savepoint/savepoint-<id>`.

Creation & Deletion

Currently, we can provide two ways for user to control the creation and deletion of savepointstag.

  1. Table options
    We can periodically create savepoints according to user configuration, A new core option will be introduced: `savepoint.create-interval` (Duration Type).
  2. Flink action
    In case that user wants to manually create or delete a savepoint, we propose to introduce tow actions:
    `create-savepoint` to create a savepoint at current time point;
    `delete-savepoint --id <id>` to delete a specified savepoint.

In the future, after Flink support stored procedure, we can provide procedures to managing savepoints.

Interaction with Snapshot

...

actionargumentnote
create-tag--name <tag-name>: specify name of the tag.create a tag based on the latest snapshot. 
delete-tag--name <tag-name>: specify which tag will be deleted.delete a tag

...

.

System Table

We suppose propose to introduce a system table `SavepointsTable``$tags`. The schema is:

Code Block
languagesql
firstline1
savepointtag_name STRING,
tagged_snapshot_id BIGINT,
schema_id BIGINT,
savecommit_time BIGINT,
record_count BIGINT 

Usage

Time Travel to Savepoint (Batch Reading)

Currently, we support time traveling to a snapshot. Now we can support to travel to a savepoint.

For spark, we can use SQL `SELECT * FROM t VERSION AS OF <version-string>`. Currently, the version string only supports a long value which represents snapshot. Now we suppose to use `snapshot.<id>` to specify traveling to snapshot or use `savepoint.<id>` to specify traveling to savepoint. The old style `<id>` will be considered as snapshot id. 

For Flink, we can use dynamic option `scan.snapshot-id=<snapshot-id>`. We suppose to introduce a new core option `scan.savepoint-id` to support traveling to a savepoint.

Time Travel to Timestamp with Savepoint

...

Data Files Handling

When we create a tag, Paimon will store metadata of the data files used by tag instead of copying data files. So it's important to deal with the data files when we make changes of snapshots or tags.

Creating Tag

When creating tag, the tagged snapshot file will be copied to the tag directory, which contains the manifest list point to the data files. 

Deleting Tag

When we delete a tag, all data files used by this tag are deletion candidates. How we determine a data file can or can not be deleted? we should take snapshots and tags into consideration. 

For snapshots, We consider 2 scenarios:

  1. Earliest snapshotId <= taggedSnapshotId: the snapshots in [earliest, taggedSnapshotId] may still use data files in deletion candidates. So we should check:
    Full data files of earliest snapshot should be removed from candidates;
    Delta data files of snapshots in (earliest, tagged snapshot] should be removed form candidates because they may be streaming read.
  2. Earliest snapshotId > taggedSnapshotId: Since all the snapshots contains data files based on previous snapshot, we can just only check the full data files of earliest snapshot (remove from candidates).

For tags, because each tag contains data files based on previous tag (just like snapshots), we can only check the full data files of neighbor tags if they exist (remove from candidates).

After we check the used data files, we can delete the rest in candidates.

Expiring Snapshot

We already had a mechanism to find deletion candidates when expiring snapshots, please see `SnapshotDeletion#doDeleteExpiredDataFiles`. Assumes we have gotten deletion candidates via existing method, let's discuss how to filter out data files used by tags.

We need 3 parameters to determine whether a data file should be removed from candidates:

  1. The snapshot id at which the data file is deleted (`deleteId`). This id can be gotten when we iterate the expiring snapshots.
  2. The snapshot id at which the data file is committed (`commitId`). To get this id, we should record it in `DataFileMeta` (see section `Proposed Changes → DataFileMeta Modification and Compatibility`). 
  3. The list of tagged snapshots ID (`taggedSnapshots`). This can be gotten from tag files in storage.

The core idea is that if a data file is committed and deleted within a tag, it can be deleted. For example, assumes we have `taggedSnapshots` as `[100, 200, 300]`, if a data file `A` is committed at snapshot 105 and deleted at snapshot 120, we can see `A` is not saved by any tag. When we expire snapshot 119, `A` will be put into candidates, and we find no tag use it, so `A` will be deleted finally; if `A` is deleted at snapshot 201, we can see `A` is saved by the second tag. When we expire snapshot 200, `A` will be put into candidates, but `A` will not be deleted because the second tag still uses it.

Future Work

Periodically Creation & Expiration

We can support periodically creating and expiring tags by providing these options:

keyRequiresTypenote

tag.create-time

falseStringThe time point to create tag periodically. Must be the format 'hh:mm'.
tag.create-intervalfalseDurationInterval between two tags. At least 1d and must be integers of days.

Tag Management Call Procedures 

  1. Spark supports extension to extend SQL syntax. We can provide tag management procedures after we support CALL statement for Spark.
  2. Supporting CALL is in the road map of Flink. We can also provide tag management procedures for Flink when possible. 

Compatibility, Deprecation, and Migration Plan

None.

Test Plan

UT tests: verify creating and reading savepointtags

IT tests: verify savepoint tag related logic, including:

  1. time travel to savepointtag
  2. expiration of snapshots won't delete data files pointed by savepoints

...

  1. used by tags
  2. delete tags can delete unused data files correctly

Compatibility tests:

  1. version 3 ManifestEntry can read old style Paimon table
  2. create tag in old style Paimon table and test expiring snapshots

Rejected Alternatives

Use name `Savepoint`

The savepoint is an existing concept in Flink and database. We think use this word may confuse users. The concept tag is used in iceberg, it is used to reference a snapshot. We think our scenario is similar to it, so we adopt the name `tag`.

Support starting from a

...

tag in streaming reading

Current design of savepoint tag just store full data manifests, so it's not able to support streaming reading now. 

...

Record Data file's Commit Snapshot in ManifestEntry

We choose to record it in DataFileMeta instead. This is because every time Paimon generate a snapshot, it will create new manifest entries for data files. Assuming we record it in ManifestEntry, consider this scenario: we commit data file A to snapshot #1, we will get manifest entry Entry#1 as [ADD, A, commit at #1]. Then we commit -A to snapshot #2, we will get manifest entry Entry#2 as [DELETE, A, ?], which cannot know the commit snapshot for A. So we have to record it to data file meta directly.