You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 19 Current »

Discussion thread

https://lists.apache.org/thread/d8toll8042k3m4z3pvn1pwg0n8twm0wt
Vote threadTBD
ISSUETBD
ReleasePaimon-0.5

Motivation

In data warehouse, it's a common requirement to query historical data. So we need to periodically merge incremental data with historical data to generate complete data of current time. For example, Hive 2.2 supports using MERGE INTO statement to merge source table's records into target table by performing update, insert and delete according to different filter, so a good way is taking historical table as target and incremental table as source, then using MERGE INTO to perform update (for existing records) and insert (for non-existing records). But when there exists a large amount of data, the merging may cost much time.

Paimon's snapshots can provide another way to query historical data without cost of merging. Paimon's data storage will generate snapshot for each commit, so we can find historical data at any snapshot (usually called time travel query in SQL). 

But in most scenarios, a table will generate too many snapshots, so Paimon lets users to configure snapshot expiration time to clean old, unused snapshots. Thus the snapshot of a specified time point may have expired when user want to query it.

To solve this problem, we propose to introduce a new mechanism `Tag`. A tag is created from snapshot and can keep longer. The purposes of tag are:

  1. Fault recovery (or we can say disaster recovery). Users can ROLL BACK to a tag if needed. If user rollbacks to a tag, the table will hold the data in the tag and the data committed  after the tag will be deleted. In this scenario the tag is like backup, but more lightweight because the tag only record the metadata of data files instead of copying them.

  2. Record versions of data at a longer interval (typically daily or weekly). With tag, user can query the old data in batch mode. In this scenario the tag is like long-lived snapshot.

Proposed Changes

We will introduce new usage for users and new internal changes of codes and file storage to support tag.  

Usage with Tag

Currently, we support time traveling to a snapshot. Now we propose to support traveling to a tag. 

There are two ways for time travel:

  1. Specifying a version
    Currently, Paimon support using the snapshot id as target version of time travel. For example, in Spark, we can travel to snapshot 1 by SQL `SELECT * FROM t VERSION AS OF 1`. We propose to support using `tag-name` as version string. 
  2. Specifying a time point
    Currently, Paimon support traveling to a time point. Paimon Scanner will find the snapshot of which commit time is closest to given time point. We propose to support that if all of the snapshots before the time point have expired, try to find the closest tag.

Otherwise, we propose to expose creating and deleting operation of tag to users:

  1. Creating tag: let user to create a tag based on the current snapshot and name the tag.
  2. Deleting tag: let user to delete a tag by it's name.

Storage

Like snapshot, a new directory `/tag` will be created under table directory for storing tags. The qualified path for a tag file is `/path/to/table/tag/<tag-name>`, and the tag name is specified by user. 

New Classes

It's not necessary to introduce a new `Tag` class because the tag is very similar to snapshot, we can just reuse the `Snapshot`.  When we create a tag from a snapshot, we can just copy the corresponding snapshot file to the tag directory with tag name; when we read a tag, we can deserialize the tag file to a snapshot.

We need a `TagManager` to manage the tags (similar to `SnapshotManager`). 

TagManager
public class TagManager {
	/** Return the root Directory of tags. */
	public Path tagDirectory(); 
 	
	/** Return the path of a tag. */
	public Path tagPath(String tagName);

	/** Create a tag from given snapshot and save it in the storage. */
	public void commitTag(Snapshot snapshot, String tagName);

	/** Expire a tag and clean unused files in the storage. */
	public void expireTag(String tagName);	

	/** Check if a tag exists. */
	public boolean tagExists(String tagName);    

	/** Get the tagged snapshot. */
	public Snapshot snapshot(String tagName);

	/** Get all tagged snapshots in an iterator. */
	public Iterator<Snapshot> taggedSnapshots();

    /** Get previous tag of which commit time is earlier. */
	public @Nullable String previous(String tagName);

	/** Get next tag of which commit time is later. */
	public @Nullable String next(String tagName);
}


We need a `TagsTable`, which can provide information of tags as system table `<table>$tags`.

The schema of TagsTable is in section `Public Interfaces`.

DataFileMeta Modification and Compatibility

For the convenience of deleting unused data files when expiring snapshots (see `DataFiles Handling → Expiring Snapshot`), we propose to add a new field `long commitSnapshot` to `DataFileMeta`.

Compatibility

DataFileMeta Ser/De: We will upgrade `ManifestEntrySerializer` to version 3.  In version 3, if the ManifestEntrySerializer receives version 2 InternalRow, the commitSnapshot will be set to -1. 

Expiring snapshots: If we find the commitSnapshot is -1, we fall back to trivial method (walk through all data files of all tags to check whether the data file is used or not).

Public Interfaces

SQL Syntax of Time Travel (only for batch read)

Spark

SELECT * FROM t VERSION AS OF tag-name

Note: The tag name can not be an numeric string to make a difference from snapshot. This limitation will be checked when creating tag and documented.

SELECT * FROM t /*+ OPTIONS('scan.tag-name'='<name>') */

Flink Actions

We propose to provide two Flink action for users to control the creation and deletion of tag.

actionargumentnote
create-tag--name <tag-name>: specify name of the tag.create a tag based on the latest snapshot. 
delete-tag--name <tag-name>: specify which tag will be deleted.delete a tag.

System Table

We propose to introduce a system table `$tags`. The schema is:

tag_name STRING,
tagged_snapshot_id BIGINT,
schema_id BIGINT,
commit_time BIGINT,
record_count BIGINT 

Data Files Handling

When we create a tag, Paimon will store metadata of the data files used by tag instead of copying data files. So it's important to deal with the data files when we make changes of snapshots or tags.

Creating Tag

When creating tag, the tagged snapshot file will be copied to the tag directory, which contains the manifest list point to the data files. 

Deleting Tag

When we delete a tag, all data files used by this tag are deletion candidates. How we determine a data file can or can not be deleted? we should take snapshots and tags into consideration. 

For snapshots, We consider 2 scenarios:

  1. Earliest snapshotId <= taggedSnapshotId: the snapshots in [earliest, taggedSnapshotId] may still use data files in deletion candidates. So we should check:
    Full data files of earliest snapshot should be removed from candidates;
    Delta data files of snapshots in (earliest, tagged snapshot] should be removed form candidates because they may be streaming read.
  2. Earliest snapshotId > taggedSnapshotId: Since all the snapshots contains data files based on previous snapshot, we can just only check the full data files of earliest snapshot (remove from candidates).

For tags, because each tag contains data files based on previous tag (just like snapshots), we can only check the full data files of neighbor tags if they exist (remove from candidates).

After we check the used data files, we can delete the rest in candidates.

Expiring Snapshot

We already had a mechanism to find deletion candidates when expiring snapshots, please see `SnapshotDeletion#doDeleteExpiredDataFiles`. Assumes we have gotten deletion candidates via existing method, let's discuss how to filter out data files used by tags.

We need 3 parameters to determine whether a data file should be removed from candidates:

  1. The snapshot id at which the data file is deleted (`deleteId`). This id can be gotten when we iterate the expiring snapshots.
  2. The snapshot id at which the data file is committed (`commitId`). To get this id, we should record it in `DataFileMeta` (see section `Proposed Changes → DataFileMeta Modification and Compatibility`). 
  3. The list of tagged snapshots ID (`taggedSnapshots`). This can be gotten from tag files in storage.

The core idea is that if a data file is committed and deleted within a tag, it can be deleted. For example, assumes we have `taggedSnapshots` as `[100, 200, 300]`, if a data file `A` is committed at snapshot 105 and deleted at snapshot 120, we can see `A` is not saved by any tag. When we expire snapshot 119, `A` will be put into candidates, and we find no tag use it, so `A` will be deleted finally; if `A` is deleted at snapshot 201, we can see `A` is saved by the second tag. When we expire snapshot 200, `A` will be put into candidates, but `A` will not be deleted because the second tag still uses it.

Future Work

Periodically Creation & Expiration

We can support periodically creating and expiring tags by providing these options:

keyRequiresTypenote

tag.create-time

falseStringThe time point to create tag periodically. Must be the format 'hh:mm'.
tag.create-intervalfalseDurationInterval between two tags. At least 1d and must be integers of days.

Tag Management Call Procedures 

  1. Spark supports extension to extend SQL syntax. We can provide tag management procedures after we support CALL statement for Spark.
  2. Supporting CALL is in the road map of Flink. We can also provide tag management procedures for Flink when possible. 

Compatibility, Deprecation, and Migration Plan

None.

Test Plan

UT tests: verify creating and reading tags

IT tests: verify tag related logic, including:

  1. time travel to tag
  2. expiration of snapshots won't delete data files used by tags
  3. delete tags can delete unused data files correctly

Compatibility tests:

  1. version 3 ManifestEntry can read old style Paimon table
  2. create tag in old style Paimon table and test expiring snapshots

Rejected Alternatives

Use name `Savepoint`

The savepoint is an existing concept in Flink and database. We think use this word may confuse users. The concept tag is used in iceberg, it is used to reference a snapshot. We think our scenario is similar to it, so we adopt the name `tag`.

Support starting from a tag in streaming reading

Current design of tag just store full data manifests, so it's not able to support streaming reading now. 

Record Data file's Commit Snapshot in ManifestEntry

We choose to record it in DataFileMeta instead. This is because every time Paimon generate a snapshot, it will create new manifest entries for data files. Assuming we record it in ManifestEntry, consider this scenario: we commit data file A to snapshot #1, we will get manifest entry Entry#1 as [ADD, A, commit at #1]. Then we commit -A to snapshot #2, we will get manifest entry Entry#2 as [DELETE, A, ?], which cannot know the commit snapshot for A. So we have to record it to data file meta directly.


  • No labels