Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Motivation

In data warehouse, it's a common requirement to query historical data. So we need to periodically merge incremental data with historical data to generate complete data of current time. For example, Hive 2.2 supports using MERGE INTO statement to merge source table's records into target table by performing update, insert and delete according to different filter, so a good way is taking historical table as target and incremental table as source, then using MERGE INTO to perform update (for existing records) and insert (for non-existing records). But when there exists a large amount of data, the merging may cost much time.

...

  1. Fault recovery (or we can say disaster recovery). Users can ROLL BACK to a tag if needed. If user rollbacks to a tag, the table will hold the data in the tag and the data committed  after the tag will be deleted. In this scenario the tag is like backup, but more lightweight because the tag only record the metadata of data files instead of copying them.

  2. Record versions of data at a longer interval (typically daily or weekly). With tag, user can query the old data in batch mode. In this scenario the tag is like long-lived snapshot.

Proposed Changes

We will introduce new usage for users and new internal changes of codes and file storage to support tag.  

Usage with Tag

Currently, we support time traveling to a snapshot. Now we propose to support traveling to a tag. 

There are two ways for time travel:

  1. Specifying a version
    Currently, Paimon support using the snapshot id as target version of time travel. For example, in Spark, we can travel to snapshot 1 by SQL `SELECT * FROM t VERSION AS OF 1`. We propose to support using `tag-name` as version string. 
  2. Specifying a time point
    Currently, Paimon support traveling to a time point. Paimon Scanner will find the snapshot of which commit time is closest to given time point. We propose to support that if all of the snapshots before the time point have expired, try to find the closest tag.

Otherwise, we propose to expose creating and deleting operation of tag to users:

  1. Creating tag: let user to create a tag based on the current snapshot and name the tag.
  2. Deleting tag: let user to delete a tag by it's name.

Storage

Like snapshot, a new directory `/tag` will be created under table directory for storing tags. The qualified path for a tag file is `/path/to/table/tag/tag<tag-<id>`name>`, and the id is increased from 1.tag name is specified by user. 

New Classes

It's not necessary to introduce a new `Tag` class because the tag is very similar to snapshot, we can just reuse the `Snapshot`.  When we create a tag from a snapshot, we can just copy the corresponding snapshot file to the tag directory with tag name; when we read a tag, we can deserialize the tag file to a snapshot.

We need a `TagManager` to manage the tags (similar to `SnapshotManager`). We need an entrance class `Tag` to store the information of a tag:

Code Block
languagejava
firstline1
titleSavepointTagManager
linenumberstrue
public class TagTagManager {
	/** UsedReturn the forroot identifyDirectory aof tagtags. */
	privatepublic final long id;

	/** Used to identify which snapshot does this tag mark. */
    private final long taggedSnapshotId;

   Path tagDirectory(); 
 	
	/** Id ofReturn the currentpath table schema of the records in this a tag. */
	privatepublic finalPath long schemaIdtagPath(String tagName);

	/** The manifest list of all data of this tag. */
	private final long fullManifestList;

	/** How many records in this tagCreate a tag from given snapshot and save it in the storage. */
	privatepublic finalvoid long recordCount;

	/** Getters. */
	commitTag(Snapshot snapshot, String tagName);

	/** SomeExpire utila methods. */

	/** Return all {@link ManifestFileMeta} instances for full data manifests in this tagtag and clean unused files in the storage. */
 	public List<ManifestFileMeta>void dataManifestsexpireTag(ManifestListString manifestListtagName);	
	
	/** SerializeCheck toif json. */
	public String toJson();

	/** Deserialize from jsona tag exists. */
	public staticboolean Savepoint fromJsontagExists(String jsontagName);    

	/** Get athe tagtagged from pathsnapshot. */
	public staticSnapshot Tag fromPathsnapshot(FileIO fileIO, Path path);
}

We need a `TagManager` to manage the tags (similar to `SnapshotManager`).

Code Block
languagejava
firstline1
titleSavepointManager
linenumberstrue
public class TagManager {String tagName);

	/** Get Returnall thetagged rootsnapshots Directoryin ofan tagsiterator. */
	public PathIterator<Snapshot> tagDirectorytaggedSnapshots();
 	
	/** Return the path of a tag. */
	public Path tagPath(long tagId);

 	/** Get aprevious tag instance. */
	public Tag tag(long tagId);

 	/** Check if a tag existsof which commit time is earlier. */
	public @Nullable booleanString tagExistsprevious(longString tagIdtagName);
 	
	/** Return id of the earliest tag. */
 	public @Nullable Long earliestTagId();

 	/** Return id of the latest tagGet next tag of which commit time is later. */
    	public @Nullable LongString latestTagIdnext(String tagName);
}


We need a `TagsTable`, which can provide information of tags as system table `#tags``<table>$tags`.

The schema of TagsTable is in section `Public Interfaces`.

Time Travel to Tags Support

Time Travel to Tag (Batch Reading)

Currently, we support time traveling to a snapshot. Now we can support to travel to a savepoint.

For spark, we can use SQL `SELECT * FROM t VERSION AS OF <version-string>`. Currently, the version string only supports a long value which represents snapshot. Now we suppose to use `snapshot.<id>` to specify traveling to snapshot or use `savepoint.<id>` to specify traveling to savepoint. The old style `<id>` will be considered as snapshot id. 

For Flink, we can use dynamic option `scan.snapshot-id=<snapshot-id>`. We suppose to introduce a new core option `scan.savepoint-id` to support traveling to a savepoint.

Time Travel to Timestamp with Savepoint

Currently, we support time traveling to a time point. Paimon Scanner will find the snapshot of which commit time is closest to the specified time. The problem is all of the snapshots before the time point may have expired. So we suppose to try to find the closest savepoint in this scenario.

Public Interfaces

Table Options 

...

savepoint.create-time

...

DataFileMeta Modification and Compatibility

For the convenience of deleting unused data files when expiring snapshots (see `DataFiles Handling → Expiring Snapshot`), we propose to add a new field `long commitSnapshot` to `DataFileMeta`.

Compatibility

DataFileMeta Ser/De: We will upgrade `ManifestEntrySerializer` to version 3.  In version 3, if the ManifestEntrySerializer receives version 2 InternalRow, the commitSnapshot will be set to -1. 

Expiring snapshots: If we find the commitSnapshot is -1, we fall back to trivial method (walk through all data files of all tags to check whether the data file is used or not).

Public Interfaces

SQL Syntax of Time Travel (only for batch read)

Spark

SELECT * FROM t VERSION AS OF tag-name

Note: The tag name can not be an numeric string to make a difference from snapshot. This limitation will be checked when creating tag and documented.

SELECT * FROM t /*+ OPTIONS('scan.tag-name'='<name>') */

Flink Actions

We propose to provide two Flink action for users

SQL Syntax

Flink Actions

Currently, we can provide two ways for user to control the creation and deletion of savepointstag.

  1. Table options
    We can periodically create savepoints according to user configuration, A new core option will be introduced: `savepoint.create-interval` (Duration Type).
  2. Flink action
    In case that user wants to manually create or delete a savepoint, we propose to introduce tow actions:
    `create-savepoint` to create a savepoint at current time point;
    `delete-savepoint --id <id>` to delete a specified savepoint.

In the future, after Flink support stored procedure, we can provide procedures to managing savepoints.

System Table

actionargumentnote
create-tag--name <tag-name>: specify name of the tag.create a tag based on the latest snapshot. 
delete-tag--name <tag-name>: specify which tag will be deleted.delete a tag.

System Table

We propose to introduce a system table `$tags`We suppose to introduce a system table `SavepointsTable`. The schema is:

Code Block
languagesql
firstline1
savepointtag_name STRING,
tagged_snapshot_id BIGINT,
schema_id BIGINT,
savecommit_time BIGINT,
record_count BIGINT 

Data Files Handling

When we create a tag, Paimon will store metadata of the data files used by tag instead of copying data files. So it's important to deal with the data files when we make changes of snapshots or tags.

Creating Tag

When creating tag, the tagged snapshot file will be copied to the tag directory, which contains the manifest list point to the data files. 

Deleting Tag

...

Expiring Snapshot

...

When

...

we delete a tag, all data files used by this tag are deletion candidates. How we determine a data file can or can not be deleted? we should take snapshots and tags into consideration. 

For snapshots, We consider 2 scenarios:

  1. Earliest snapshotId <= taggedSnapshotId: the snapshots in [earliest, taggedSnapshotId] may still use data files in deletion candidates. So we should check:
    Full data files of earliest snapshot should be removed from candidates;
    Delta data files of snapshots in (earliest, tagged snapshot] should be removed form candidates because they may be streaming read.
  2. Earliest snapshotId > taggedSnapshotId: Since all the snapshots contains data files based on previous snapshot, we can just only check the full data files of earliest snapshot (remove from candidates).

For tags, because each tag contains data files based on previous tag (just like snapshots), we can only check the full data files of neighbor tags if they exist (remove from candidates).

After we check the used data files, we can delete the rest in candidates.

Expiring Snapshot

We already had a mechanism to find deletion candidates when expiring snapshots, please see `SnapshotDeletion#doDeleteExpiredDataFiles`. Assumes we have gotten deletion candidates via existing method, let's discuss how to filter out data files used by tags.

We need 3 parameters to determine whether a data file should be removed from candidates:

  1. The snapshot id at which the data file is deleted (`deleteId`). This id can be gotten when we iterate the expiring snapshots.
  2. The snapshot id at which the data file is committed (`commitId`). To get this id, we should record it in `DataFileMeta` (see section `Proposed Changes → DataFileMeta Modification and Compatibility`). 
  3. The list of tagged snapshots ID (`taggedSnapshots`). This can be gotten from tag files in storage.

The core idea is that if a data file is committed and deleted within a tag, it can be deleted. For example, assumes we have `taggedSnapshots` as `[100, 200, 300]`, if a data file `A` is committed at snapshot 105 and deleted at snapshot 120, we can see `A` is not saved by any tag. When we expire snapshot 119, `A` will be put into candidates, and we find no tag use it, so `A` will be deleted finally; if `A` is deleted at snapshot 201, we can see `A` is saved by the second tag. When we expire snapshot 200, `A` will be put into candidates, but `A` will not be deleted because the second tag still uses it.

Future Work

Periodically Creation & Expiration

We can support periodically creating and expiring tags by providing these options:

keyRequiresTypenote

tag.create-time

falseStringThe time point to create tag periodically. Must be the format 'hh:mm'.
tag.create-intervalfalseDurationInterval between two tags. At least 1d and must be integers of days.

Tag Management Call Procedures 

Future Work

Table options

...

  1. Spark supports extension to extend SQL syntax. We can provide savepoint tag management procedures after we support CALL statement for Spark.
  2. Supporting CALL is in the road map of Flink. We can also provide savepoint procedures after thentag management procedures for Flink when possible

Compatibility, Deprecation, and Migration Plan

None.

Test Plan

UT tests: verify creating and reading tags

...

  1. time travel to tag
  2. expiration of snapshots won't delete data files used by tags
  3. delete tags can delete unused data files correctly

Compatibility tests:

  1. version 3 ManifestEntry can read old style Paimon table
  2. create tag in old style Paimon table and test expiring snapshots

Rejected Alternatives

Use name `Savepoint`

The savepoint is an existing concept in Flink and database. We think use this word may confuse users. The concept tag is used in iceberg, it is used to reference a snapshot. We think our scenario is similar to it, so we adopt the name `tag`.

Support starting from a tag in streaming reading

Current design of tag just store full data manifests, so it's not able to support streaming reading now. 

Record Data file's Commit Snapshot in ManifestEntry

We choose to record it in DataFileMeta instead. This is because every time Paimon generate a snapshot, it will create new manifest entries for data files. Assuming we record it in ManifestEntry, consider this scenario: we commit data file A to snapshot #1, we will get manifest entry Entry#1 as [ADD, A, commit at #1]. Then we commit -A to snapshot #2, we will get manifest entry Entry#2 as [DELETE, A, ?], which cannot know the commit snapshot for A. So we have to record it to data file meta directly.