Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread

https://lists.apache.org/thread/d8toll8042k3m4z3pvn1pwg0n8twm0wt
Vote threadTBD
ISSUETBD
ReleasePaimon-0.5

Motivation

In data warehouse, it's a common requirement to query historical data. So we need to periodically merge incremental data with historical data to generate complete data of current time. For example, Hive 2.2 supports using MERGE INTO statement to merge source table's records into target table by performing update, insert and delete according to different filter, so a good way is taking historical table as target and incremental table as source, then using MERGE INTO to perform update (for existing records) and insert (for non-existing records). But when there exists a large amount of data, the merging may cost much time.

...

  1. Fault recovery (or we can say disaster recovery). Users can ROLL BACK to a tag if needed. If user rollbacks to a tag, the table will hold the data in the tag and the data committed  after the tag will be deleted. In this scenario the tag is like backup, but more lightweight because the tag only record the metadata of data files instead of copying them.

  2. Record versions of data at a longer interval (typically daily or weekly). With tag, user can query the old data in batch mode. In this scenario the tag is like long-lived snapshot.

Proposed Changes

We will introduce new usage for users and new internal changes of codes and file storage to support tag.  

Usage with Tag

Currently, we support time traveling to a snapshot. Now we propose to support traveling to a tag. 

...

  1. Creating tag: let user to create a tag based on the current snapshot and name the tag.
  2. Deleting tag: let user to delete a tag by it's name.

Storage

Like snapshot, a new directory `/tag` will be created under table directory for storing tags. The qualified path for a tag file is `/path/to/table/tag/tag-<id>`, and the id is increased from 1.

New Classes

We need an entrance class `Tag` to store the information of a tag:

...

The schema of TagsTable is in section `Public Interfaces`.

Public Interfaces

SQL Syntax of Time Travel (only for batch read)

Spark

SELECT * FROM t VERSION AS OF tag-name.<name>

SELECT * FROM t VERSION AS OF tag-id.<id>

SELECT * FROM t /*+ OPTIONS('scan.tag-name'='<name>') */

SELECT * FROM t /*+ OPTIONS('scan.tag-id'='<id>') */

Flink Actions

We propose to provide two Flink action for users to control the creation and deletion of tag.

actionargumentnote
create-tag--name <tag-name>: specify name of the tag.create a tag based on the latest snapshot. 
delete-tag--name <tag-name>: specify which tag will be deleted.delete a tag.

System Table

We propose to introduce a system table `$tags`. The schema is:

Code Block
languagesql
firstline1
tag_name STRING,
tag_id BIGINT,
creation_time BIGINT,
tagged_snapshot_id BIGINT,
schema_id BIGINT,
record_count BIGINT 

Data Files Handling

When we create a tag, Paimon will store metadata of the data files used by tag instead of copying data files. So it's important to deal with the data files when we make changes of snapshots or tags.

Creating Tag

When creating tag, we merge the `baseManifestList` and `deltaManifestList` to full data and create manifest list for them. The manifest list will be stored in tag.

Deleting Tag

When we delete a tag, all data files used by this tag are deletion candidates. How we determine a data file can or can not be deleted? we should take snapshots and tags into consideration. 

...

After we check the used data files, we can delete the rest in candidates.

Expiring Snapshot

We already had a mechanism to find deletion candidates when expiring snapshots, please see `SnapshotDeletion#doDeleteExpiredDataFiles`. Assumes we have gotten deletion candidates via existing method, let's discuss how to filter out data files used by tags.

...

The core idea is that if a data file is committed and deleted within a tag, it can be deleted. For example, assumes we have `taggedSnapshots` as `[100, 200, 300]`, if a data file `A` is committed at snapshot 105 and deleted at snapshot 120, we can see `A` is not saved by any tag. When we expire snapshot 119, `A` will be put into candidates, and we find no tag use it, so `A` will be deleted finally; if `A` is deleted at snapshot 201, we can see `A` is saved by the second tag. When we expire snapshot 200, `A` will be put into candidates, but `A` will not be deleted because the second tag still uses it.

Future Work

Periodically Creation & Expiration

We can support periodically creating and expiring tags by providing these options:

keyRequiresTypenote

tag.create-time

falseStringThe time point to create tag periodically. Must be the format 'hh:mm'.
tag.create-intervalfalseDurationInterval between two tags. At least 1d and must be integers of days.

Tag Management Call Procedures 

  1. Spark supports extension to extend SQL syntax. We can provide tag management procedures after we support CALL statement for Spark.
  2. Supporting CALL is in the road map of Flink. We can also provide tag management procedures for Flink when possible. 

Compatibility, Deprecation, and Migration Plan

None.

Test Plan

UT tests: verify creating and reading tags

...

  1. time travel to tag
  2. expiration of snapshots won't delete data files used by tags
  3. delete tags can delete unused data files correctly

Rejected Alternatives

Use name `Savepoint`

The savepoint is an existing concept in Flink and database. We think use this word may confuse users. The concept tag is used in iceberg, it is used to reference a snapshot. We think our scenario is similar to it, so we adopt the name `tag`.

Support starting from a tag in streaming reading

Current design of tag just store full data manifests, so it's not able to support streaming reading now. 

...