Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

IDIEP-43
Author
Sponsor
Created

  

Status

Status
colourGreen
titleACTIVE


Table of Contents

Motivation

The most of open-source distributed systems provide `cluster snapshots` functionality, but the Apache Ignite doesn't have such one. Cluster snapshots will allow users to copy their data from an active cluster and load it later on another, such as copying data from a production system into a smaller QA or development system. 

Management

Create snapshot

...

Configuration

Snapshot storage path allowed to be configured by IgniteConfiguration , by default IGNITE_HOME/work/snapshots  directory used.

Code Block
languagejava
themeConfluence
titleIgniteSnapshotIgniteConfiguration#snapshotPath
collapsetrue
public interfaceclass IgniteSnapshotIgniteConfiguration {
    /**
     * @return List ofDirectory where will be stored all results knownof snapshot snapshotsoperations.
 If {@code null}  */then
    public List<String> getSnapshots();

    /**
 relative {@link #DFLT_SNAPSHOT_DIRECTORY} will * Create a consistent copy of all persistence cache groups from the whole clusterbe used.
     */
    private String snapshotPath;
}

Create snapshot

[public] Java API

Code Block
languagejava
themeConfluence
titleIgniteSnapshot
collapsetrue
public interface IgniteSnapshot {* @param name Snapshot name.
     /**
 @return Future which will be* completed whenCreate a processconsistent ends.
copy of all persistence cache */
groups from the whole publiccluster.
  IgniteFuture<Void> createSnapshot(String name);
}

[public] JMX MBean

Code Block
languagejava
themeConfluence
titleSnapshotMXBean
collapsetrue
package org.apache.ignite.mxbean;

/**
     * @param name Snapshot features MBeanname.
 */
@MXBeanDescription("MBean that provides access for snapshot features.")
public interface SnapshotMXBean {
    /**
     * Gets all created snapshots on the cluster    * @return Future which will be completed when a process ends.
     */
    public *IgniteFuture<Void> createSnapshot(String name);
}

[public] JMX MBean

Code Block
languagejava
themeConfluence
titleSnapshotMXBean
collapsetrue
package org.apache.ignite.mxbean;

/**
 * Snapshot features MBean.
 */
@MXBeanDescription("MBean that provides access for snapshot features.")
public interface SnapshotMXBean {@return List of all known snapshots.
     */
    @MXBeanDescription("List of all known snapshots.")
    public List<String> getSnapshots();

    /**
     * Create the cluster-wide snapshot with given name.
     *
     * @param snpName Snapshot name to created.
     * @see IgniteSnapshot#createSnapshot(String) (String)
     */
    @MXBeanDescription("Create cluster-wide snapshot.")
    public void createSnapshot(@MXBeanParameter(name = "snpName", description = "Snapshot name.") String snpName);
}

[public] Command Line

Code Block
languagebash
themeConfluence
titlecontrol.sh --snapshot
collapsetrue
# Starts cluster snapshot operation.
control.sh --snapshot ERIB_23012020

# Display all known cluster snapshots.
control.sh --snapshot -list

[internal] File Transmission

Internal API which allows to request and receive the required snapshot of cache groups from a remote. Used as a part of IEP-28: Rebalance peer-2-peer to send created local snapshot to the remote (demander) node.

Code Block
languagejava
themeConfluence
titleIgniteSnapshotManager#createRemoteSnapshot
collapsetrue
/**
 * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
 * @param rmtNodeId The remote node to connect to.
 * @param partConsumer Received partition handler.
 * @return Future which will be completed when requested snapshot fully received.
 */
public IgniteInternalFuture<Void> createRemoteSnapshot(
    UUID rmtNodeId,
    Map<Integer, Set<Integer>> parts,
    BiConsumer<File, GroupPartitionId> partConsumer);

Restore snapshot (manually)

The snapshot procedure stores all internal files (binary meta, marshaller meta, cache group data files, and cache group configuration) the same directory structure way as the Apache Ignite does with preserving configured consistent node id.

To restore a cluster from snapshot user must manually do the following:

  1. Remove data from the checkpoint, wal, binary_meta, marshaller directories.
  2. Copy all snapshot data files to the IGNITE_HOME/work  directory with paying attention to consistent node ids.

Snapshot Events

Ignite distributed events functionality allows user applications to receive notifications when some of the events occur in the distributed cluster environment. User must be able to get notified for snapshot operation executions within the cluster:

  • EVT_CLUSTER_SNAPSHOT_STARTED – the snapshot operation started on a server node.
  • EVT_CLUSTER_SNAPSHOT_FINISHED – the snapshot operation finished successfully.
  • EVT_CLUSTER_SNAPSHOT_FAILED – the snapshot operation interrupted due to the reason came in the additional message.

Snapshot Security

Ignite must have a capability to specify permissions to allow/disallow execution of cluster snapshot operation. The following permission must be supported:

  • ADMIN_SNAPSHOT_OPS – permission to control creation and cancellation cluster snapshot operations.

Restore snapshot (manually)

The snapshot procedure stores all internal files (binary meta, marshaller meta, cache group data files, and cache group configuration) the same directory structure way as the Apache Ignite does with preserving configured consistent node id.

To restore a cluster from snapshot user must manually do the following:

  1. Remove data from the checkpoint, wal, binary_meta, marshaller directories.
  2. Copy all snapshot data files to the IGNITE_HOME/work  directory with paying attention to consistent node ids.
Code Block
languagebash
themeConfluence
titleSnashot Directory Structure
collapsetrue
maxmuzaf@TYE-SNE-0009931 ignite % tree work
work
└── snapshots
    └── backup23012020
        ├── binary_meta
        │   ├── snapshot_IgniteClusterSnapshotSelfTest0
        │   ├── snapshot_IgniteClusterSnapshotSelfTest1
        │   
Code Block
languagebash
themeConfluence
titleSnashot Directory Structure
collapsetrue
maxmuzaf@TYE-SNE-0009931 ignite % tree work
work
└── snapshots
    └── backup23012020
        ├── binary_meta
        │   ├── snapshot_IgniteClusterSnapshotSelfTest0
        │   ├── snapshot_IgniteClusterSnapshotSelfTest1
        │   └── snapshot_IgniteClusterSnapshotSelfTest2
        ├── db
        │   ├── snapshot_IgniteClusterSnapshotSelfTest0
        │   │   ├── cache-default
        │   │   │   ├── cache_data.dat
        │   │   │   ├── part-0.bin
        │   │   │   ├── part-2.bin
        │   │   │   ├── part-3.bin
        │   │   │   ├── part-4.bin
        │   │   │   ├── part-5.bin
        │   │   │   └── part-6.bin
        │   │   └── cache-txCache
        │   │       ├── cache_data.dat
        │   │       ├── part-3.bin
        │   │       ├── part-4.bin
        │   │       └── part-6.bin
        │   ├── snapshot_IgniteClusterSnapshotSelfTest1
        │   │   ├── cache-default
        │   │   │   ├── cache_data.dat
        │   │   │   ├── part-1.bin
        │   │   │   ├── part-3.bin
        │   │   │   ├── part-5.bin
        │   │   │   ├── part-6.bin
        │   │   │   └── part-7.bin
        │   │   └── cache-txCache
        │   │       ├── cache_data.dat
        │   │       ├── part-1.bin
        │   │       ├── part-5.bin
        │   │       └── part-7.bin
        │   └── snapshot_IgniteClusterSnapshotSelfTest2
        │       ├── cache-default
        │       │   ├── cache_data.dat
        │       │   ├── part-0.bin
        │       │   ├── part-1.bin
        │       │   ├── part-2.bin
        │       │   ├── part-4.bin
        │       │   └── part-7.bin
        │       └── cache-txCache
        │           ├── cache_data.dat
        │           ├── part-0.bin
        │                  └── part-2.bin
        └── marshaller

17 directories, 30 files

Snapshot requirements

  1. Users must have the ability to create a snapshot of persisted user data (in-memory is out of the scope).
  2. Users must have the ability to create a snapshot from the cluster under the load without cluster deactivation.
  3. The snapshot process must not block for a long time any of the user transactions (short-time blocks are acceptable).
  4. The snapshot process must allow creating a data snapshot on each node and transfer it to any of the remote nodes for internal cluster needs.
  5. The created snapshot at the cluster-level must be fully consistent from cluster-wide terms, there should not be any incomplete transactions inside.
  6. The snapshot of each node must be consistent – cache partitions, binary meta, etc. must not have unnecessary changes.

Snapshot process

With respect to the cluster-wide snapshot operation, the process of creating a copy of user data can be split into the following high-level steps:

  1. Start a cluster-wide snapshot operation using any of the available public API.
  2. Each node will receive such an event and start a local snapshot task which must create a consistent copy of available user data on a local node.
  3. Collect the completion results of local snapshot tasks from each node and send the results back to the user.

The Distributed Process is used to complete steps [1, 3]. To achieve the step [2] a new SnapshotFutureTask  must be developed.

Cluster snapshot

To achieve cluster-wide snapshot consistency the Partition-Map-Exchange will be reused to block for a while all user transactions.

In a short amount of time while user transactions are blocked the local snapshot task will be started by forcing the checkpoint process. These actions have the following guarantees: all current transactions are finished and all new transactions are blocked, all data from the PageMemory will be flushed on a disk at checkpoint end. This is a short time-window when all cluster data will be eventually fully consistent on the disk.

The cluster-wide snapshot task steps overview in terms of distributed process:

  1. The snapshot distributed process starts a new snapshot operation by sending an initial discovery message.
  2. The distributed process configured action initiates a new local snapshot task on each cluster node.
  3. The discovery event from the distributed process pushes a new exchange task to the exchange worker to start PME.
  4. While transactions are blocked (see onDoneBeforeTopologyUnlock) each local node forces the checkpoint thread and waits while an appropriate local snapshot task starts.
  5. The distributed process collects completion results from each cluster node:
    1. If there are no execution errors and no baseline nodes left the cluster – the snapshot created successfully.
    2. If there are some errors or some of the cluster nodes fails prior to complete local results – all local snapshots will be reverted from each node, the snapshot fails.
└── part-2.bin
        └── marshaller

17 directories, 30 files

Restore snapshot (automatic)

Restore the cache groups on the active cluster

We need to provide the ability to restore individual cache groups from an existing snapshot on an active cluster.

Process overview

The overall process includes the following sequential steps:

  1. Check the integrity of the cache group snapshot and make sure that the target cache group doesn't exist
  2. Copy partitions locally on all nodes where the snapshot was taken. Check and merge binary metadata on one of the snapshot nodes.
  3. Dynamically start the restored cache group.

If errors occur (I/O errors, node failure, etc.), the changes made to the cluster must be fully or partially reverted (depending on the type of error).

Requirements

The cluster should be active

Current limitations
  • Restoring is possible only if all parts of the snapshot are present in the cluster. Each node looks for a local snapshot data in the configured snapshot path by the given snapshot name and consistent node ID.
  • The restore procedure can be applied only to cache groups created by the user.
  • Cache groups to be restored from the snapshot must not be present in the cluster. If they are present, they must be destroyed by the user before starting this operation.
  • Concurrent restore operations are not allowed. Thus, if one operation has been started, the other can only be started after the first is completed.
Failover

To avoid the possibility of starting a node with inconsistent data, the partition files are first copied to a temporary directory and then this directory is moved using an atomic move operation. When the node starts, the temporary folder is deleted (if such exists).

If any of the nodes on which the snapshot was taken leave the cluster during the restore process, the process is aborted and all changes are rolled back (to achieve this, a special cache launch mode was introduced, if the node exits during the exchange, the process is rolled back).

Whole cluster restore

// TBD

Snapshot lifecycle

Sometimes the user is faced with the task of adding post-processing to the snapshot operation, for example, calculating the checksum of files, checking the consistency, etc. The same applies to the automatic restore procedure - the user should be able to check the consistency of files, checksums, etc. before restore them.

It is impossible to implement pre/post-processing using standard Ignite events. if a custom check detects a problem, the entire operation with the snapshot must be aborted.

It is recommended to add the ability to load SnapshotHandler user extensions that will implement custom checks.

Code Block
languagejava
themeMidnight
titleSnapshot handlers API
collapsetrue
/** handler */
public interface SnapshotHandler<T> extends Extension {
    /** Snapshot handler type. */
    public SnapshotHandlerType type();

    /** Local processing of a snapshot operation. */
    public @Nullable T invoke(SnapshotHandlerContext ctx) throws Exception;

    /** Processing of results from all nodes. */
    public default void complete(String name, Collection<SnapshotHandlerResult<T>> results) throws Exception {
        for (SnapshotHandlerResult<T> res : results) {
            if (res.error() == null)
                continue;;

            throw new IgniteCheckedException("Snapshot handler has failed " +
                "[snapshot=" + name +
                ", handler=" + getClass().getName() +
                ", nodeId=" + res.node().id() + "].", res.error());
        }
    }
}

/** type */
public enum SnapshotHandlerType {
    /** Handler is called immediately after the snapshot is taken. */
    CREATE,

    /** Handler is called just before restore operation is started. */
    RESTORE
}

/** context */
public class SnapshotHandlerContext {
    SnapshotMetadata metadata;

    Collection<String> grps;

    ClusterNode locNode;
}

/** Result of local processing on the node. In addition to the result received from the handler, it also includes information about the error (if any) and the node on which this result was received. */
public class SnapshotHandlerResult<T> implements Serializable {
    T data;

    Exception err;

    ClusterNode node;
}

Snapshot requirements

  1. Users must have the ability to create a snapshot of persisted user data (in-memory is out of the scope).
  2. Users must have the ability to create a snapshot from the cluster under the load without cluster deactivation.
  3. The snapshot process must not block for a long time any of the user transactions (short-time blocks are acceptable).
  4. The snapshot process must allow creating a data snapshot on each node and transfer it to any of the remote nodes for internal cluster needs.
  5. The created snapshot at the cluster-level must be fully consistent from cluster-wide terms, there should not be any incomplete transactions inside.
  6. The snapshot of each node must be consistent – cache partitions, binary meta, etc. must not have unnecessary changes.

Snapshot overview

With respect to the cluster-wide snapshot operation, the process of creating a copy of user data can be split into the following high-level steps:

  1. Start a cluster-wide snapshot operation using any of the available public API.
  2. Each node will receive such an event and start a local snapshot task which must create a consistent copy of available user data on a local node.
  3. Collect the completion results of local snapshot tasks from each node and send the results back to the user.

The Distributed Process is used to complete steps [1, 3]. To achieve the step [2] a new SnapshotFutureTask  must be developed.

Data to copy to snapshot

The following must be copied to snapshot:

  • cache partition files (primary, backup, SQL index)
  • cache configuration
  • binary meta-information
  • marshaller meta information

Copy binary, marshaller metas

Binary meta, marshaller meta still stored in on-heap, so it is easy to collect and keep this persistent user information consistent under the checkpoint write-lock (no changes allowed).

Copy cache configurations

Cache configuration files may be changed during, for instance, concurrent SQL ADD[DROP] COLUMN  operations or SQL CREATE[DROP] INDEX  operations. These changes always happen through the exchange thread. 

Partitions copy strategy

Another strategy must be used for cache partition files. The checkpoint process will write dirty pages from PageMemory to the cache partition files simultaneously with another process copy them to the target directory. Each cache partition file is consistent only at checkpoint end. So, to avoid long-time transaction blocks during the cluster-wide snapshot process it should not wait when checkpoint ends on each node. The process of copying cache partition files must do the following:

  1. Start copying each partition file to the target directory as he is. These files will have dirty data due to concurrent checkpoint thread writes.
  2. Collect all dirty pages related to ongoing checkpoint process and corresponding partition files and apply them to the copied file right after the copy process ends. These dirty pages will be written to special new .delta files. Each file created per each partition. (e.g. part-1.bin  with part-1.bin.delta  – fully consistent cache partition).

There are two possible cases during copy cache partition files simultaneously with the checkpoint thread:

  1. Cache partition file already copied, but the checkpoint still not ended – wait while checkpoint ends and start merging cache partition file with its delta.
  2. The current checkpoint process ended, but the cache partition file is still copying – the next checkpoint process must read and copy the old version of a page to delta file prior to writing its dirty page.

Local snapshot task

...

The local snapshot task is an operation that executes on each local node and independently. It copies all the persistence user files from the Ignite work directory to the target snapshot directory with additional machinery to achieve consistency. This task is closely connected with the node checkpointing process due to, for instance, cache partition files are only eventually consistent on disk during the ongoing checkpoint process and fully consistent only when the checkpoint ends.

The local snapshot operation on each cluster node reflects as – SnapshotFutureTask .

User data to copy to snapshot

The following must be copied to snapshot:

  • cache partition files
  • cache configuration
  • binary meta information
  • marshaller meta information

Base copy strategy

Binary meta, marshaller meta, configurations still stored in on-heap, so it is easy to collect and keep this persistent user information consistent under the checkpoint write-lock (no changes allowed).

Another strategy must be used for cache partition files. The checkpoint process will write dirty pages from PageMemory to the cache partition files simultaneously with another process copy them to the target directory. Each cache partition file is consistent only at checkpoint end. So, to avoid long-time transaction blocks during the cluster-wide snapshot process it should not wait when checkpoint ends on each node. The process of copying cache partition files must do the following:

  1. Start copying each partition file to the target directory as he is. These files will have dirty data due to concurrent checkpoint thread writes.
  2. Collect all dirty pages related to ongoing checkpoint process and corresponding partition files and apply them to the copied file right after the copy process ends. These dirty pages will be written to special new .delta files. Each file created per each partition. (e.g. part-1.bin  with part-1.bin.delta  – fully consistent cache partition).

There are two possible cases during copy cache partition files simultaneously with the checkpoint thread:

  1. Cache partition file already copied, but the checkpoint still not ended – wait while checkpoint ends and start merging cache partition file with its delta.
  2. The current checkpoint process ended, but the cache partition file is still copying – the next checkpoint process must read and copy the old version of a page to delta file prior to writing its dirty page.

Snapshot task process

SnapshotFutureTask .

The local snapshot task process

  1. A new checkpoint starts (forced by node or a regular one).
  2. Under the checkpoint write lock – fix cache partition length for each partition (copy from 0  - to length ).
  3. The task creates new on-heap collections with marshaller meta, binary meta to copy.
  4. The task starts copying partition files.
  5. The checkpoint thread:
    1. If the associated with task checkpoint is not finished - write a dirty page to the original partition file and to delta file.
    2. If the associated with task checkpoint is finished and partition file still copying – read an original page from the original partition file and copy it to the delta file prior to the dirty page write.
  6. If partition file is copied – start merging copied partition with its delta file.
  7. The task ends then all data successfully copied to the target directory and all cache partition files merged with its deltas.

Cluster-wide snapshot

The cluster-wide snapshot is an operation which executes local snapshot task on each node. To achieve cluster-wide snapshot consistency the Partition-Map-Exchange will be reused to block for a while all user transactions.

In a short amount of time while user transactions are blocked the local snapshot task will be started by forcing the checkpoint process. These actions have the following guarantees: all current transactions are finished and all new transactions are blocked, all data from the PageMemory will be flushed on a disk at checkpoint end. This is a short time-window when all cluster data will be eventually fully consistent on the disk.

The cluster-wide snapshot task steps overview in terms of distributed process:

  1. The snapshot distributed process starts a new snapshot operation by sending an initial discovery message.
  2. The distributed process configured action initiates a new local snapshot task on each cluster node.
  3. The discovery event from the distributed process pushes a new exchange task to the exchange worker to start PME.
  4. While transactions are blocked (see onDoneBeforeTopologyUnlock) each local node forces the checkpoint thread and waits while an appropriate local snapshot task starts.
  5. The distributed process collects completion results from each cluster node:
    1. If there are no execution errors and no baseline nodes left the cluster – the snapshot created successfully.
    2. If there are some errors or some of the cluster nodes fails prior to complete local results – all local snapshots will be reverted from each node, the snapshot fails
  6. A new checkpoint starts (forced by node or a regular one).
  7. Under the checkpoint write lock – fix cache partition length for each partition (copy from 0  - to length ).
  8. The task creates new on-heap collections with marshaller meta, binary meta to copy.
  9. The task starts copying partition files.
  10. The checkpoint thread:
    1. If the associated with task checkpoint is not finished - write a dirty page to the original partition file and to delta file.
    2. If the associated with task checkpoint is finished and partition file still copying – read an original page from the original partition file and copy it to the delta file prior to the dirty page write.
  11. If partition file is copied – start merging copied partition with its delta file.
  12. The task ends then all data successfully copied to the target directory and all cache partition files merged with its deltas
    1. .

Remote snapshot

The internal components must have the ability to request a consistent (not cluster-wide, but locally) snapshot from remote nodessnapshot from remote nodes. This machinery is reused for IEP-28: Rebalance via cache partition files to transmit cache partition files. The process requests for execution of a local snapshot task on the remote node and collects back the execution results. The File Transmission protocol is used to transfer files between the cluster nodes. The local snapshot task can be reused independently to perform data copying and sending.from the remote node to the local node. 

The high-level overview looks like:

  1. The node sends a request via CommunicaionSPI to the remote node with the map of cache group ids and required cache partitions.
  2. The remote node accepts the request and registers locally a new SnapshotFutureTask for execution.
  3. The task is started and sends independently cache partition files and cache delta files when they read.
  4. The node accepts cache partition files and accepts cache partition delta files by applying them on the fly to an appropriate partition (see TransmissionHandler#chunkHandler).
  5. When all requested cache groups and partitions received – the process completes successfully.
    If an error occurred during transmission the snapshot task stops and all temporary files deleted.

Crash recovery

During the cluster snapshot operation, a node may crash for some reason.

  • The crashed node must revert the local uncompleted snapshot at its startup.
  • If the crashed node hasn't complete local snapshot prior to the crash then all nodes affected by this cluster snapshot operation must delete their local snapshot results.

Limitations

  1. The whole cluster allowed only one cluster-wide snapshot operation per time.
  2. Encrypted caches currently not allowed due to required additional changes to merge cache partition with its delta file.
  3. The cache stop operation is not allowed during the ongoing cluster snapshot. An exception will be thrown to users on the cache stop attempt.
  4. The cluster snapshot operation will be stopped if some of the baseline nodes left or fail prior to report reporting about their local snapshot completion. Partial local snapshots will be reverted.
  5. Datastructure and system caches are not included in the snapshot.

Discussion Links

http://apache-ignite-developers.2346864.n4.nabble.com/DISCUSSION-Hot-cache-backup-td41034.htmlhtml

Documentation

https://ignite.apache.org/docs/latest/persistence/snapshots

Reference Links

  1. Apache Geode – Cache and Region Snapshots 
    https://geode.apache.org/docs/guide/16/managing/cache_snapshots/chapter_overview.html
  2. Apache Cassandra – Backing up and restoring data
    https://docs.datastax.com/en/cassandra-oss/3.0/cassandra/operations/opsBackupRestore.html

Tickets

Jira
serverASF JIRA
columnskey,summary,type,updated,assignee,priority,status,resolution
maximumIssues20
jqlQueryproject = Ignite AND labels IN (iep-43) order by key
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
// Links or report with relevant JIRA tickets.