Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Motivation

...

Like snapshot, a new directory `/tag` will be created under table directory for storing tags. The qualified path for a tag file is `/path/to/table/tag/tag<tag-<id>`name>`, and the id is increased from 1.tag name is specified by user. 

New Classes

It's not necessary to introduce a new `Tag` class because the tag is very similar to snapshot, we can just reuse the `Snapshot`.  When we create a tag from a snapshot, we can just copy the corresponding snapshot file to the tag directory with tag name; when we read a tag, we can deserialize the tag file to a snapshot.

We need a `TagManager` to manage the tags (similar to `SnapshotManager`). We need an entrance class `Tag` to store the information of a tag:

Code Block
languagejava
firstline1
titleSavepointTagManager
linenumberstrue
public class TagTagManager {
	/** UsedReturn forthe identify a tag. */
	private final long id;

	/** Nameroot Directory of the tagtags. */
	privatepublic final String name;

	/** Creation time. */
	private final long creationTimeMills;

	/** Used to identify which snapshot does this tag mark. */
    private final long taggedSnapshotId;

   Path tagDirectory(); 
 	
	/** IdReturn of the current tablepath schema of the records in this a tag. */
	privatepublic finalPath long schemaIdtagPath(String tagName);

	/** TheCreate manifest list of all data of this tag. */
	private final long fullManifestList;

	/** How many records in this taga tag from given snapshot and save it in the storage. */
	privatepublic finalvoid long recordCount;

	/** Getters. */
	commitTag(Snapshot snapshot, String tagName);

	/** SomeExpire utila methods. */

	/** Return all {@link ManifestFileMeta} instances for full data manifests in this tagtag and clean unused files in the storage. */
 	public List<ManifestFileMeta>void dataManifestsexpireTag(ManifestListString manifestListtagName);	
	
	/** SerializeCheck toif json. */
	public String toJson();

	/** Deserialize from jsona tag exists. */
	public staticboolean Tag fromJsontagExists(String jsontagName);    

	/** Get athe tagtagged from pathsnapshot. */
	public staticSnapshot Tag fromPathsnapshot(FileIO fileIO, Path path);
}

We need a `TagManager` to manage the tags (similar to `SnapshotManager`).

Code Block
languagejava
firstline1
titleSavepointManager
linenumberstrue
public class TagManager {String tagName);

	/** ReturnGet all thetagged rootsnapshots Directoryin ofan tagsiterator. */
	public PathIterator<Snapshot> tagDirectorytaggedSnapshots(); 
 	
	/** Return the path of a tag. */
	public Path tagPath(long tagId);

 	/** Return the tag id by name /** Get previous tag of which commit time is earlier. */
	public @Nullable LongString findIdByNameprevious(String tagName);	 

 	/** Get anext tag instanceof bywhich id. */
	public Tag tag(long tagId);

 	/** Check if a tag existscommit time is later. */
	public boolean tagExists(long tagId);
 	
	/** Return id of the earliest tag. */
 	public @Nullable Long earliestTagId();

 	/** Return id of the latest tag. */
    public @Nullable Long latestTagId(@Nullable String next(String tagName);
}


We need a `TagsTable`, which can provide information of tags as system table `<table>$tags`.

The schema of TagsTable is in section `Public Interfaces`.

DataFileMeta Modification and Compatibility

For the convenience of deleting unused data files when expiring snapshots (see `DataFiles Handling → Expiring Snapshot`), we propose to add a new field `long commitSnapshot` to `DataFileMeta`.

Compatibility

DataFileMeta Ser/De: We will upgrade `ManifestEntrySerializer` to version 3.  In version 3, if the ManifestEntrySerializer receives version 2 InternalRow, the commitSnapshot will be set to -1. 

Expiring snapshots: If we find the commitSnapshot is -1, we fall back to trivial method (walk through all data files of all tags to check whether the data file is used or not).

Public Interfaces

SQL Syntax of Time Travel (only for batch read)

...

SELECT * FROM t VERSION AS OF tag-name.<name>SELECT * FROM t VERSION AS OF tag-id.<id>

Note: The tag name can not be an numeric string to make a difference from snapshot. This limitation will be checked when creating tag and documented.

SELECT * FROM t /*+ OPTIONS('scan.tag-name'='<name>') */

SELECT * FROM t /*+ OPTIONS('scan.tag-id'='<id>') */

Flink Actions

We propose to provide two Flink action for users to control the creation and deletion of tag.

...

Code Block
languagesql
firstline1
tag_name STRING,
tagtagged_snapshot_id BIGINT,
creation_time BIGINT,
tagged_snapshot_schema_id BIGINT,
schemacommit_idtime BIGINT,
record_count BIGINT 

...

Creating Tag

When creating tag, we merge the `baseManifestList` and `deltaManifestList` to full data and create manifest list for them. The manifest list will be stored in tag.the tagged snapshot file will be copied to the tag directory, which contains the manifest list point to the data files. 

Deleting Tag

When we delete a tag, all data files used by this tag are deletion candidates. How we determine a data file can or can not be deleted? we should take snapshots and tags into consideration. 

For snapshots,  We We consider 3 2 scenarios:

...

  1. Earliest snapshotId <= taggedSnapshotId: the snapshots in [earliest, taggedSnapshotId] may still use data files in deletion candidates. So we should check:
    Full data files of earliest snapshot should be removed from candidates;
    Delta data files of snapshots in (earliest, tagged snapshot] should be removed form candidates because they may be streaming read.
  2. Earliest snapshotId > taggedSnapshotId: All Since all the snapshots contains data files based on previous snapshot. So , we can just only check the full data files of earliest snapshot (remove from candidates).

...

  1. The snapshot id at which the data file is deleted (`deleteId`). This id can be gotten when we iterate the expiring snapshots.
  2. The snapshot id at which the data file is committed (`commitId`). To get this id, we should record it in `DataFileMeta` (see section `Proposed Changes → DataFileMeta Modification and Compatibility`)
  3. The list of tagged snapshots ID (`taggedSnapshots`). This can be gotten from tag files in storage.

...

  1. time travel to tag
  2. expiration of snapshots won't delete data files used by tags
  3. delete tags can delete unused data files correctly

Compatibility tests:

  1. version 3 ManifestEntry can read old style Paimon table
  2. create tag in old style Paimon table and test expiring snapshots

Rejected Alternatives

Use name `Savepoint`

...

Current design of tag just store full data manifests, so it's not able to support streaming reading now. 

Record Data file's Commit Snapshot in ManifestEntry

We choose to record it in DataFileMeta instead. This is because every time Paimon generate a snapshot, it will create new manifest entries for data files. Assuming we record it in ManifestEntry, consider this scenario: we commit data file A to snapshot #1, we will get manifest entry Entry#1 as [ADD, A, commit at #1]. Then we commit -A to snapshot #2, we will get manifest entry Entry#2 as [DELETE, A, ?], which cannot know the commit snapshot for A. So we have to record it to data file meta directly.