You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Discussion thread

https://lists.apache.org/thread/d8toll8042k3m4z3pvn1pwg0n8twm0wt
Vote threadTBD
ISSUETBD
ReleasePaimon-0.5

Motivation

In data warehouse, it's a common requirement to query historical data. So we need to periodically merge incremental data with historical data to generate complete data of current time. For example, Hive 2.2 supports using MERGE INTO statement to merge source table's records into target table by performing update, insert and delete according to different filter, so a good way is taking historical table as target and incremental table as source, then using MERGE INTO to perform update (for existing records) and insert (for non-existing records). But when there exists a large amount of data, the merging may cost much time.

Paimon's snapshots can provide another way to query historical data without cost of merging. Paimon's data storage will generate snapshot for each commit, so we can find historical data at any snapshot (usually called time travel query in SQL). 

But in most scenarios, a table will generate too many snapshots, so Paimon lets users to configure snapshot expiration time to clean old, unused snapshots. Thus the snapshot of a specified time point may have expired when user want to query it.

To solve this problem, we propose to introduce a new mechanism `Tag`. A tag is created from snapshot and can keep longer. The purposes of tag are:

  1. Fault recovery (or we can say disaster recovery). Users can ROLL BACK to a tag if needed. If user rollbacks to a tag, the table will hold the data in the tag and the data committed  after the tag will be deleted. In this scenario the tag is like backup, but more lightweight because the tag only record the metadata of data files instead of copying them.

  2. Record versions of data at a longer interval (typically daily or weekly). With tag, user can query the old data in batch mode. In this scenario the tag is like long-lived snapshot.

Proposed Changes

Storage

Like snapshot, a new directory `/tag` will be created under table directory for storing tags. The qualified path for a tag file is `/path/to/table/tag/tag-<id>`, and the id is increased from 1.

New Classes

We need an entrance class `Tag` to store the information of a tag:

Savepoint
public class Tag {
	/** Used for identify a tag. */
	private final long id;

	/** Used to identify which snapshot does this tag mark. */
    private final long taggedSnapshotId;

   	/** Id of the current table schema of the records in this tag. */
	private final long schemaId;

	/** The manifest list of all data of this tag. */
	private final long fullManifestList;

	/** How many records in this tag. */
	private final long recordCount;

	/** Getters. */
	
	/** Some util methods. */

	/** Return all {@link ManifestFileMeta} instances for full data manifests in this tag. */
 	public List<ManifestFileMeta> dataManifests(ManifestList manifestList);
	
	/** Serialize to json. */
	public String toJson();

	/** Deserialize from json. */
	public static Savepoint fromJson(String json);

	/** Get a tag from path. */
	public static Tag fromPath(FileIO fileIO, Path path);
}


We need a `TagManager` to manage the tags (similar to `SnapshotManager`).

SavepointManager
public class TagManager {
	/** Return the root Directory of tags. */
	public Path tagDirectory();
 	
	/** Return the path of a tag. */
	public Path tagPath(long tagId);

 	/** Get a tag instance. */
	public Tag tag(long tagId);

 	/** Check if a tag exists. */
	public boolean tagExists(long tagId);
 	
	/** Return id of the earliest tag. */
 	public @Nullable Long earliestTagId();

 	/** Return id of the latest tag. */
    public @Nullable Long latestTagId();
}


We need a `TagsTable`, which can provide information of tags as system table `#tags`.

The schema of TagsTable is in section `Public Interfaces`.

Time Travel to Tags Support

Time Travel to Tag (Batch Reading)

Currently, we support time traveling to a snapshot. Now we can support to travel to a savepoint.

For spark, we can use SQL `SELECT * FROM t VERSION AS OF <version-string>`. Currently, the version string only supports a long value which represents snapshot. Now we suppose to use `snapshot.<id>` to specify traveling to snapshot or use `savepoint.<id>` to specify traveling to savepoint. The old style `<id>` will be considered as snapshot id. 

For Flink, we can use dynamic option `scan.snapshot-id=<snapshot-id>`. We suppose to introduce a new core option `scan.savepoint-id` to support traveling to a savepoint.

Time Travel to Timestamp with Savepoint

Currently, we support time traveling to a time point. Paimon Scanner will find the snapshot of which commit time is closest to the specified time. The problem is all of the snapshots before the time point may have expired. So we suppose to try to find the closest savepoint in this scenario.

Usage

Periodically savepoints

Public Interfaces

Table Options 

keyRequiresTypenote

savepoint.create-time

falseStringThe time point to create savepoint periodically. Must be the format 'hh:mm'.
savepoint.create-intervalfalseDurationInterval between two savepoints. At least 1d and must be intergers of days.

Flink Actions

Now, 

Call Procedures (Future work)

  1. Spark supports extension to extend SQL syntax. We can provide savepoint procedures after we support CALL statement for Spark.
  2. Supporting CALL is in the road map of Flink. We can provide savepoint procedures after then. 

Proposed Changes

Storage

Like snapshot, a new directory `/savepoint` will be created under table directory for storing savepoints. The qualified path for a savepoint file is `/path/to/table/savepoint/savepoint-<id>`.

Creation & Deletion

Currently, we can provide two ways for user to control the creation and deletion of savepoints.

  1. Table options
    We can periodically create savepoints according to user configuration, A new core option will be introduced: `savepoint.create-interval` (Duration Type).
  2. Flink action
    In case that user wants to manually create or delete a savepoint, we propose to introduce tow actions:
    `create-savepoint` to create a savepoint at current time point;
    `delete-savepoint --id <id>` to delete a specified savepoint.

In the future, after Flink support stored procedure, we can provide procedures to managing savepoints.

Interaction with Snapshot

  1. When creating savepoint, we will do a full-compact and pick the compacted snapshot to save.
  2. When expiring snapshot, Paimon will check whether the data files are used by living snapshots. If not, they will be deleted. After we introduce savepoint, we should also check if the data files are used by savepoints. 
  3. When deleting savepoint, we will check and delete unused datafiles (like we expire snapshot).

System Table

We suppose to introduce a system table `SavepointsTable`. The schema is:

savepoint_id BIGINT
schema_id BIGINT
save_time BIGINT
record_count BIGINT 


Compatibility, Deprecation, and Migration Plan

None.

Test Plan

UT tests: verify creating and reading savepoint

IT tests: verify savepoint related logic, including:

  1. time travel to savepoint
  2. expiration of snapshots won't delete data files pointed by savepoints

Rejected Alternatives

Support starting from a savepoint in streaming reading

Current design of savepoint just store full data manifests, so it's not able to support streaming reading now. 

 







  • No labels