You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Motivation

In data warehouse, it's a common requirement to query historical data. So we need to periodically merge incremental data with historical data to generate complete data of current time. For example, Hive 2.2 supports using MERGE INTO statement to merge source table's records into target table by performing update, insert and delete according to different filter, so a good way is taking historical table as target and incremental table as source, then using MERGE INTO to perform update (for existing records) and insert (for non-existing records). But when there exists a large amount of data, the merging may cost much time.

Paimon's snapshots can provide another way to query historical data without cost of merging. Paimon's data storage will generate snapshot for each commit, so we can find historical data at any snapshot (usually called time travel query in SQL). 

But in most scenarios, a table will generate too many snapshots, so Paimon lets users to configure snapshot expiration time to clean old, unused snapshots. Thus the snapshot of a specified time point may have expired when user want to query it. So we want to introduce savepoint for Paimon to persist full data in a time point.

Public Interfaces

We need a entrance class `Savepoint` to store the information of a savepoint:

Savepoint
public class Savepoint {
	/** Used for identify a savepoint. */
	private final long id;
	
	/** Which snapshot does this savepoint save. */
	private final long savedSnapshotId;

	/** Id of the current table schema of the records in this savepoint. */
	private final long schemaId;

	/** The manifest list of all data of this savepoint. */
	private final long fullManifestList;

	/** How many records in this savepoint. */
	private final long recordCount;

	/** Getters. */
	
	/** Some util methods. */

	/** Return all {@link ManifestFileMeta} instances for full data manifests in this snapshot. */
 	public List<ManifestFileMeta> dataManifests(ManifestList manifestList);
	
	/** Serialize to json. */
	public String toJson();

	/** Deserialize from json. */
	public static Savepoint fromJson(String json);

	/** Get a savepoint from path. */
	public static Savepoint fromPath(FileIO fileIO, Path path);
}

We need a `SavepointManager` to manage the savepoints (similar to `SnapshotManager`).

SavepointManager
public class SavepointManager {
	/** Return the root Directory of savepoints. */
	public Path savepointDirectory();
 	
	/** Return the path of a savepoint. */
	public Path savepointPath(long savepointId);

 	/** Get a savepoint instance. */
	public Savepoint savepoint(long savepointId);

 	/** Check if a savepoint exists. */
	public boolean savepointExists(long savepointId);
 	
	/** Return id of the earlist savepoint. */
 	public @Nullable Long earlistSavapointId();

 	/** Return id of the latest savepoint. */
    public @Nullable Long latestSavepointId();
}


Proposed Changes

Storage

Like snapshot, a new directory `/savepoint` will be created under table directory for storing savepoints. The qualified path for a savepoint file is `/path/to/table/savepoint/savepoint-<id>`.

Creation & Deletion

Currently, we can provide two ways for user to control the creation and deletion of savepoints.

  1. Table options
    We can periodically create savepoints according to user configuration, A new core option will be introduced: `savepoint.create-interval` (Duration Type).
  2. Flink action
    In case that user wants to manually create or delete a savepoint, we propose to introduce tow actions:
    `create-savepoint` to create a savepoint at current time point;
    `delete-savepoint --id <id>` to delete a specified savepoint.

In the future, after Flink support stored procedure, we can provide procedures to managing savepoints.

System Table

We suppose to introduce a system table `SavepointsTable`. The schema is:

savepoint_id BIGINT
saved_snapshot_id BIGINT
schema_id BIGINT
save_time BIGINT
record_count BIGINT 

Usage

We'd like to explain more details about where savepoints take effect.

Spark Time Travel to Version

The spark SQL is `SELECT * FROM t VERSION AS OF <version-string>`.

Currently, the version string only supports a long value which represents snapshot. Now we suppose to use `snapshot.<id>` to specify traveling to snapshot or use `savepoint.<id>` to specify traveling to savepoint.

Spark Time Travel to Timestamp

The spark SQL is `SELECT * FROM t TIMESTAMP AS OF <timestamp-string>`

Currently, we will find the snapshot of which commit time is closest to the specified time point. The problem is the snapshots before the time point may have expired. Now we suppose to support try to find the closest savepoint after Paimon Scanner finds no snapshot.

Flink Streaming Reading from Snapshot/Savepoint

Currently, we support to specify start snapshot in streaming reading by dynamic option `scan.snapshot-id=<snapshot-id>`. In case that 

Flink Streaming Reading from timestamp

  1. Spark time travel to version
  2. Spark time travel to timestamp
  3. Flink streaming read from








  • No labels