Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Motivation

In data warehouse, it's a common requirement to query historical data. So we need to periodically merge incremental data with historical data to generate complete data of current time. For example, Hive 2.2 supports using MERGE INTO statement to merge source table's records into target table by performing update, insert and delete according to different filter, so a good way is taking historical table as target and incremental table as source, then using MERGE INTO to perform update (for existing records) and insert (for non-existing records). But when there exists a large amount of data, the merging may cost much time.

...

  1. Fault recovery (or we can say disaster recovery). Users can ROLL BACK to a tag if needed. If user rollbacks to a tag, the table will hold the data in the tag and the data committed  after the tag will be deleted. In this scenario the tag is like backup, but more lightweight because the tag only record the metadata of data files instead of copying them.

  2. Record versions of data at a longer interval (typically daily or weekly). With tag, user can query the old data in batch mode. In this scenario the tag is like long-lived snapshot.

Proposed Changes

We will introduce new usage for users and new internal changes of codes and file storage to support tag.  

Usage with Tag

Currently, we support time traveling to a snapshot. Now we propose to support traveling to a tag. 

There are two ways for time travel:

  1. Specifying a version
    Currently, Paimon support using the snapshot id as target version of time travel. For example, in Spark, we can travel to snapshot 1 by SQL `SELECT * FROM t VERSION AS OF 1`. We propose to support using `tag-name` as version string. 
  2. Specifying a time point
    Currently, Paimon support traveling to a time point. Paimon Scanner will find the snapshot of which commit time is closest to given time point. We propose to support that if all of the snapshots before the time point have expired, try to find the closest tag.

Otherwise, we propose to expose creating and deleting operation of tag to users:

  1. Creating tag: let user to create a tag based on the current snapshot and name the tag.
  2. Deleting tag: let user to delete a tag by it's name.

Storage

Like snapshot, a new directory `/tag` will be created under table directory for storing tags. The qualified path for a tag file is `/path/to/table/tag/tag<tag-<id>`name>`, and the id is increased from 1.tag name is specified by user. 

New Classes

It's not necessary to introduce a new `Tag` class because the tag is very similar to snapshot, we can just reuse the `Snapshot`.  When we create a tag from a snapshot, we can just copy the corresponding snapshot file to the tag directory with tag name; when we read a tag, we can deserialize the tag file to a snapshot.

We need a `TagManager` to manage the tags (similar to `SnapshotManager`). We need an entrance class `Tag` to store the information of a tag:

Code Block
languagejava
firstline1
titleSavepointTagManager
linenumberstrue
public class TagTagManager {
	/** Return Usedthe forroot identifyDirectory aof tagtags. */
	privatepublic final long id;

	/** Used to identify which snapshot does this tag mark. */
    private final long taggedSnapshotId;

   Path tagDirectory(); 
 	
	/** IdReturn of the currentpath table schema of the records in this a tag. */
	privatepublic finalPath long schemaIdtagPath(String tagName);

	/** The manifest list of all data of thisCreate a tag. */
	private final long fullManifestList;

	/** How many recordsfrom given snapshot and save it in thisthe tagstorage. */
	privatepublic finalvoid long recordCount;

	/** Getters. */
	commitTag(Snapshot snapshot, String tagName);

	/** SomeExpire utila methods. */

	/** Return all {@link ManifestFileMeta} instances for full data manifests in this tagtag and clean unused files in the storage. */
 	public List<ManifestFileMeta>void dataManifestsexpireTag(ManifestListString manifestListtagName);	
	
	/** SerializeCheck toif json. */
	public String toJson();

	/** Deserialize from jsona tag exists. */
	public staticboolean Savepoint fromJsontagExists(String jsontagName);    

	/** Get athe tagtagged from pathsnapshot. */
	public staticSnapshot Tag fromPathsnapshot(FileIO fileIO, Path path);
}

We need a `TagManager` to manage the tags (similar to `SnapshotManager`).

Code Block
languagejava
firstline1
titleSavepointManager
linenumberstrue
public class TagManager {String tagName);

	/** ReturnGet all thetagged rootsnapshots Directoryin ofan tagsiterator. */
	public PathIterator<Snapshot> tagDirectorytaggedSnapshots();
 	
	/** Return the path of a tag. */
	public Path tagPath(long tagId);

 	/** Get aprevious tag instance. */
	public Tag tag(long tagId);

 	/** Check if a tag existsof which commit time is earlier. */
	public @Nullable booleanString tagExistsprevious(longString tagIdtagName);
 	
	/** ReturnGet id of the earliest tag. */
 	public @Nullable Long earliestTagId();

 	/** Return id of the latest tagnext tag of which commit time is later. */
    	public @Nullable LongString latestTagIdnext(String tagName);
}


We need a `TagsTable`, which can provide information of tags as system table `#tags``<table>$tags`.

The schema of TagsTable is in section `Public Interfaces`.

Time Travel to Tags Support

Time Travel to Tag (Batch Reading)

Currently, we support time traveling to a snapshot. Now we can support to travel to a savepoint.

DataFileMeta Modification and Compatibility

For the convenience of deleting unused data files when expiring snapshots (see `DataFiles Handling → Expiring Snapshot`), we propose to add a new field `long commitSnapshot` to `DataFileMeta`.

Compatibility

DataFileMeta Ser/De: We will upgrade `ManifestEntrySerializer` to version 3.  In version 3, if the ManifestEntrySerializer receives version 2 InternalRow, the commitSnapshot will be set to -1. 

Expiring snapshots: If we find the commitSnapshot is -1, we fall back to trivial method (walk through all data files of all tags to check whether the data file is used or not).

Public Interfaces

SQL Syntax of Time Travel (only for batch read)

Spark

SELECT For spark, we can use SQL `SELECT * FROM t VERSION AS OF <version-string>`. Currently, the version string only supports a long value which represents snapshot. Now we suppose to use `snapshot.<id>` to specify traveling to snapshot or use `savepoint.<id>` to specify traveling to savepoint. The old style `<id>` will be considered as snapshot id. 

For Flink, we can use dynamic option `scan.snapshot-id=<snapshot-id>`. We suppose to introduce a new core option `scan.savepoint-id` to support traveling to a savepoint.

Time Travel to Timestamp with Savepoint

Currently, we support time traveling to a time point. Paimon Scanner will find the snapshot of which commit time is closest to the specified time. The problem is all of the snapshots before the time point may have expired. So we suppose to try to find the closest savepoint in this scenario.

Usage

Periodically savepoints

Public Interfaces

Table Options 

tag-name

Note: The tag name can not be an numeric string to make a difference from snapshot. This limitation will be checked when creating tag and documented.

SELECT * FROM t /*+ OPTIONS('scan.tag-name'='<name>') */

Flink Actions

We propose to provide two Flink action for users to control the creation and deletion of tag.

actionargumentnote
create-tag--name <tag-name>: specify name of the tag.create a tag based on the latest snapshot. 
delete-tag--name <tag-name>: specify which tag will be deleted.delete a tag.

System Table

We propose to introduce a system table `$tags`. The schema is:

Code Block
languagesql
firstline1
tag_name STRING,
tagged_snapshot_id BIGINT,
schema_id BIGINT,
commit_time BIGINT,
record_count BIGINT 

Data Files Handling

When we create a tag, Paimon will store metadata of the data files used by tag instead of copying data files. So it's important to deal with the data files when we make changes of snapshots or tags.

Creating Tag

When creating tag, the tagged snapshot file will be copied to the tag directory, which contains the manifest list point to the data files. 

Deleting Tag

When we delete a tag, all data files used by this tag are deletion candidates. How we determine a data file can or can not be deleted? we should take snapshots and tags into consideration. 

For snapshots, We consider 2 scenarios:

  1. Earliest snapshotId <= taggedSnapshotId: the snapshots in [earliest, taggedSnapshotId] may still use data files in deletion candidates. So we should check:
    Full data files of earliest snapshot should be removed from candidates;
    Delta data files of snapshots in (earliest, tagged snapshot] should be removed form candidates because they may be streaming read.
  2. Earliest snapshotId > taggedSnapshotId: Since all the snapshots contains data files based on previous snapshot, we can just only check the full data files of earliest snapshot (remove from candidates).

For tags, because each tag contains data files based on previous tag (just like snapshots), we can only check the full data files of neighbor tags if they exist (remove from candidates).

After we check the used data files, we can delete the rest in candidates.

Expiring Snapshot

We already had a mechanism to find deletion candidates when expiring snapshots, please see `SnapshotDeletion#doDeleteExpiredDataFiles`. Assumes we have gotten deletion candidates via existing method, let's discuss how to filter out data files used by tags.

We need 3 parameters to determine whether a data file should be removed from candidates:

  1. The snapshot id at which the data file is deleted (`deleteId`). This id can be gotten when we iterate the expiring snapshots.
  2. The snapshot id at which the data file is committed (`commitId`). To get this id, we should record it in `DataFileMeta` (see section `Proposed Changes → DataFileMeta Modification and Compatibility`). 
  3. The list of tagged snapshots ID (`taggedSnapshots`). This can be gotten from tag files in storage.

The core idea is that if a data file is committed and deleted within a tag, it can be deleted. For example, assumes we have `taggedSnapshots` as `[100, 200, 300]`, if a data file `A` is committed at snapshot 105 and deleted at snapshot 120, we can see `A` is not saved by any tag. When we expire snapshot 119, `A` will be put into candidates, and we find no tag use it, so `A` will be deleted finally; if `A` is deleted at snapshot 201, we can see `A` is saved by the second tag. When we expire snapshot 200, `A` will be put into candidates, but `A` will not be deleted because the second tag still uses it.

Future Work

Periodically Creation & Expiration

We can support periodically creating and expiring tags by providing these options:

keyRequiresTypenote
savepoint

tag.create-time

falseStringThe time point to create
savepoint
tag periodically. Must be the format 'hh:mm'.
savepoint
tag.create-intervalfalseDurationInterval between two
savepoints
tags. At least 1d and must be
intergers
integers of days.

Flink Actions

Now, 

...

Tag Management Call Procedures 

  1. Spark supports extension to extend SQL syntax. We can provide savepoint tag management procedures after we support CALL statement for Spark.
  2. Supporting CALL is in the road map of Flink. We can also provide savepoint procedures after then. 

Proposed Changes

Storage

Like snapshot, a new directory `/savepoint` will be created under table directory for storing savepoints. The qualified path for a savepoint file is `/path/to/table/savepoint/savepoint-<id>`.

Creation & Deletion

Currently, we can provide two ways for user to control the creation and deletion of savepoints.

  1. Table options
    We can periodically create savepoints according to user configuration, A new core option will be introduced: `savepoint.create-interval` (Duration Type).
  2. Flink action
    In case that user wants to manually create or delete a savepoint, we propose to introduce tow actions:
    `create-savepoint` to create a savepoint at current time point;
    `delete-savepoint --id <id>` to delete a specified savepoint.

In the future, after Flink support stored procedure, we can provide procedures to managing savepoints.

Interaction with Snapshot

  1. When creating savepoint, we will do a full-compact and pick the compacted snapshot to save.
  2. When expiring snapshot, Paimon will check whether the data files are used by living snapshots. If not, they will be deleted. After we introduce savepoint, we should also check if the data files are used by savepoints. 
  3. When deleting savepoint, we will check and delete unused datafiles (like we expire snapshot).

System Table

We suppose to introduce a system table `SavepointsTable`. The schema is:

Code Block
languagesql
firstline1
savepoint_id BIGINT
schema_id BIGINT
save_time BIGINT
record_count BIGINT 
  1. tag management procedures for Flink when possible. 

Compatibility, Deprecation, and Migration Plan

None.

Test Plan

UT tests: verify creating and reading savepointtags

IT tests: verify savepoint tag related logic, including:

  1. time travel to savepointtag
  2. expiration of snapshots won't delete data files pointed by savepoints

Rejected Alternatives

  1. used by tags
  2. delete tags can delete unused data files correctly

Compatibility tests:

  1. version 3 ManifestEntry can read old style Paimon table
  2. create tag in old style Paimon table and test expiring snapshots

Rejected Alternatives

Use name `Savepoint`

The savepoint is an existing concept in Flink and database. We think use this word may confuse users. The concept tag is used in iceberg, it is used to reference a snapshot. We think our scenario is similar to it, so we adopt the name `tag`.

Support starting from a

...

tag in streaming reading

Current design of savepoint tag just store full data manifests, so it's not able to support streaming reading now. 

...

Record Data file's Commit Snapshot in ManifestEntry

We choose to record it in DataFileMeta instead. This is because every time Paimon generate a snapshot, it will create new manifest entries for data files. Assuming we record it in ManifestEntry, consider this scenario: we commit data file A to snapshot #1, we will get manifest entry Entry#1 as [ADD, A, commit at #1]. Then we commit -A to snapshot #2, we will get manifest entry Entry#2 as [DELETE, A, ?], which cannot know the commit snapshot for A. So we have to record it to data file meta directly.