You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 15 Next »

Discussion thread

https://lists.apache.org/thread/d8toll8042k3m4z3pvn1pwg0n8twm0wt
Vote threadTBD
ISSUETBD
ReleasePaimon-0.5

Motivation

In data warehouse, it's a common requirement to query historical data. So we need to periodically merge incremental data with historical data to generate complete data of current time. For example, Hive 2.2 supports using MERGE INTO statement to merge source table's records into target table by performing update, insert and delete according to different filter, so a good way is taking historical table as target and incremental table as source, then using MERGE INTO to perform update (for existing records) and insert (for non-existing records). But when there exists a large amount of data, the merging may cost much time.

Paimon's snapshots can provide another way to query historical data without cost of merging. Paimon's data storage will generate snapshot for each commit, so we can find historical data at any snapshot (usually called time travel query in SQL). 

But in most scenarios, a table will generate too many snapshots, so Paimon lets users to configure snapshot expiration time to clean old, unused snapshots. Thus the snapshot of a specified time point may have expired when user want to query it.

To solve this problem, we propose to introduce a new mechanism `Tag`. A tag is created from snapshot and can keep longer. The purposes of tag are:

  1. Fault recovery (or we can say disaster recovery). Users can ROLL BACK to a tag if needed. If user rollbacks to a tag, the table will hold the data in the tag and the data committed  after the tag will be deleted. In this scenario the tag is like backup, but more lightweight because the tag only record the metadata of data files instead of copying them.

  2. Record versions of data at a longer interval (typically daily or weekly). With tag, user can query the old data in batch mode. In this scenario the tag is like long-lived snapshot.

Proposed Changes

We will introduce new usage for users and new internal changes of codes and file storage to support tag.  

Usage with Tag

Currently, we support time traveling to a snapshot. Now we propose to support traveling to a tag. 

There are two ways for time travel:

  1. Specifying a version
    Currently, Paimon support using the snapshot id as target version of time travel. For example, in Spark, we can travel to snapshot 1 by SQL `SELECT * FROM t VERSION AS OF 1`. We propose to support using `tag-name` as version string. 
  2. Specifying a time point
    Currently, Paimon support traveling to a time point. Paimon Scanner will find the snapshot of which commit time is closest to given time point. We propose to support that if all of the snapshots before the time point have expired, try to find the closest tag.

Otherwise, we propose to expose creating and deleting operation of tag to users:

  1. Creating tag: let user to create a tag based on the current snapshot and name the tag.
  2. Deleting tag: let user to delete a tag by it's name.

Storage

Like snapshot, a new directory `/tag` will be created under table directory for storing tags. The qualified path for a tag file is `/path/to/table/tag/<tag-name>`, and the tag name is specified by user.

New Classes

We need a `TagManager` to manage the tags (similar to `SnapshotManager`). 

TagManager
public class TagManager {
	/** Return the root Directory of tags. */
	public Path tagDirectory(); 
 	
	/** Return the path of a tag. */
	public Path tagPath(String tagName);

	/** Create a tag from given snapshot and save it in the storage. */
	public void commitTag(Snapshot snapshot, String tagName);

	/** Expire a tag and clean unused files in the storage. */
	public void expireTag(String tagName);	

 	/** Get a tag instance. */
	public Tag tag(String tagName);

 	/** Check if a tag exists. */
	public boolean tagExists(String tagName);

	/** Get the snapshot id which this tag points to. */
	public long snapshotId(String tagName);

	/** Get all tags in an iterator. */
	public Iterator<Tag> tags();

    /** Get previous tag of which commit time is earlier. */
	public @Nullable String previous(String tagName);

	/** Get next tag of which commit time is later. */
	public @Nullable String next(String tagName);


We need a `TagsTable`, which can provide information of tags as system table `<table>$tags`.

The schema of TagsTable is in section `Public Interfaces`.

Public Interfaces

SQL Syntax of Time Travel (only for batch read)

Spark

SELECT * FROM t VERSION AS OF tag-name.<name>

SELECT * FROM t VERSION AS OF tag-id.<id>

SELECT * FROM t /*+ OPTIONS('scan.tag-name'='<name>') */

SELECT * FROM t /*+ OPTIONS('scan.tag-id'='<id>') */

Flink Actions

We propose to provide two Flink action for users to control the creation and deletion of tag.

actionargumentnote
create-tag--name <tag-name>: specify name of the tag.create a tag based on the latest snapshot. 
delete-tag--name <tag-name>: specify which tag will be deleted.delete a tag.

System Table

We propose to introduce a system table `$tags`. The schema is:

tag_name STRING,
tag_id BIGINT,
creation_time BIGINT,
tagged_snapshot_id BIGINT,
schema_id BIGINT,
record_count BIGINT 

Data Files Handling

When we create a tag, Paimon will store metadata of the data files used by tag instead of copying data files. So it's important to deal with the data files when we make changes of snapshots or tags.

Creating Tag

When creating tag, we merge the `baseManifestList` and `deltaManifestList` to full data and create manifest list for them. The manifest list will be stored in tag.

Deleting Tag

When we delete a tag, all data files used by this tag are deletion candidates. How we determine a data file can or can not be deleted? we should take snapshots and tags into consideration. 

For snapshots, We consider 3 scenarios:

  1. No snapshots. Do nothing to the candidates.
  2. Earliest snapshotId <= taggedSnapshotId: the snapshots in [earliest, taggedSnapshotId] may still use data files in deletion candidates. So we should check:
    Full data files of earliest snapshot should be removed from candidates;
    Delta data files of snapshots in (earliest, tagged snapshot] should be removed form candidates because they may be streaming read.
  3. Earliest snapshotId > taggedSnapshotId: All the snapshots contains data files based on previous snapshot. So we can just only check the full data files of earliest snapshot (remove from candidates).

For tags, because each tag contains data files based on previous tag (just like snapshots), we can only check the full data files of neighbor tags if they exist (remove from candidates).

After we check the used data files, we can delete the rest in candidates.

Expiring Snapshot

We already had a mechanism to find deletion candidates when expiring snapshots, please see `SnapshotDeletion#doDeleteExpiredDataFiles`. Assumes we have gotten deletion candidates via existing method, let's discuss how to filter out data files used by tags.

We need 3 parameters to determine whether a data file should be removed from candidates:

  1. The snapshot id at which the data file is deleted (`deleteId`). This id can be gotten when we iterate the expiring snapshots.
  2. The snapshot id at which the data file is committed (`commitId`). To get this id, we should record it in `DataFileMeta`. 
  3. The list of tagged snapshots ID (`taggedSnapshots`). This can be gotten from tag files in storage.

The core idea is that if a data file is committed and deleted within a tag, it can be deleted. For example, assumes we have `taggedSnapshots` as `[100, 200, 300]`, if a data file `A` is committed at snapshot 105 and deleted at snapshot 120, we can see `A` is not saved by any tag. When we expire snapshot 119, `A` will be put into candidates, and we find no tag use it, so `A` will be deleted finally; if `A` is deleted at snapshot 201, we can see `A` is saved by the second tag. When we expire snapshot 200, `A` will be put into candidates, but `A` will not be deleted because the second tag still uses it.

Future Work

Periodically Creation & Expiration

We can support periodically creating and expiring tags by providing these options:

keyRequiresTypenote

tag.create-time

falseStringThe time point to create tag periodically. Must be the format 'hh:mm'.
tag.create-intervalfalseDurationInterval between two tags. At least 1d and must be integers of days.

Tag Management Call Procedures 

  1. Spark supports extension to extend SQL syntax. We can provide tag management procedures after we support CALL statement for Spark.
  2. Supporting CALL is in the road map of Flink. We can also provide tag management procedures for Flink when possible. 

Compatibility, Deprecation, and Migration Plan

None.

Test Plan

UT tests: verify creating and reading tags

IT tests: verify tag related logic, including:

  1. time travel to tag
  2. expiration of snapshots won't delete data files used by tags
  3. delete tags can delete unused data files correctly

Rejected Alternatives

Use name `Savepoint`

The savepoint is an existing concept in Flink and database. We think use this word may confuse users. The concept tag is used in iceberg, it is used to reference a snapshot. We think our scenario is similar to it, so we adopt the name `tag`.

Support starting from a tag in streaming reading

Current design of tag just store full data manifests, so it's not able to support streaming reading now. 


  • No labels