You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state: UNDER DISCUSSION

Discussion thread: <link to mailing list DISCUSS thread>

JIRA Unable to render Jira issues macro, execution error.

Released: 

Problem

Samza currently backs up local state of jobs to compacted Kafka topics called changelogs. At LinkedIn, we observed that for jobs with large local state, the restore can be very slow and can run in the tune of multiple hours. The effect of slow restores is that a job restarting due to hardware failure, regular maintenance like patching and upgrades, or autoscaling events can cause disruptions while the state is rebuilt. 

Motivation

Our motivation of introducing Blob Store as a replacement to Kafka based changelogs is to address the following issues:

  1. Long restore times: State restore from Kafka backed changelog topic is slow because the restore happens at "one message at a time" cadence. With Blob Store as state restore backend, it is possible to restore state store in parallel, thereby reducing the restore times from hours to the order of minutes.
  2. Automate Job and Cluster maintenance: Slow restore times also hinders our ability to automate job and cluster maintenance. Since restoring state from the changelog topic can take a long time, automated tasks like auto-scaling job, OS upgrades, hardware upgrades etc. need to be planned more carefully and cannot be completely automated. With a fast restore from Blob Store based restore, it is possible to reduce the duration and impact of such maintenance tasks. 
  3. Message and store size limits: Kafka has per message (1 MB) and per partition (25 GB) size limit. Moving to a Blob Store backed storage would allow us to write messages of arbitrary size and expand the store size, which currently is bound by the maximum partition size of Kafka.

Background

We utilize some features offered by RocksDB in our proposed design to implement robust state backup and restore. It is useful to understand these concepts related to RocksDB to help understand the proposed solution better. 

RocksDB Store Composition:

RocksDB is the in-memory key-value database that powers Samza's stateful stream processing. RocksDB flushes the in-memory buffer to the disk when it gets filled. It creates 2 types of files on the host:

  1. Mutable Files: Files like OFFSET, MANIFEST, LOCK and OPTIONS are mutable files created by RocksDB. These files contain metadata information about the current state of the store. Additionally, Samza also creates mutable files with additional metadata information about the state. These are: OFFSET-V2 and CURRENT.
  2. Immutable files: RocksDB creates durable copies of memtables in the form of SST files in the store directories. However, SST files are often combined together to create larger (and possibly deduplicated) files and as such can be deleted to create new SST files.

More details about the schema of metadata as well as SST files created by RocksDB can be found in this wiki: RocksDB wiki.

RocksDB Checkpoint:

RocksDB checkpoint is a feature that allows taking a snapshot of the current state of the store to a separate directory. This is used to create a partial or full backup of RocksDB state store. RocksDB creates a checkpoint by creating a hard link of the immutable SST files in the new directory and copying over mutable metadata file like MANIFEST and CURRENT. We leverage the checkpoint feature to create a cheap and fast point in time backup of our state store. The delta between this backup and the previous backup (if one exists) is then uploaded to the Blob Store. This allows us to create an incremental backup of the state store.

A detailed description of RocksDB checkpoint process can be found here: Checkpoints in RocksDB.

Proposed Changes

We propose a Blob Store based state backup and restore for Samza. Following diagram illustrates a high-level view of our solution:

Blob Store backed backup and restore for Samza

Note: Orange represents synchronous/blocking calls. Green represents asynchronous/non blocking operations. 

  • Step 1: A synchronous checkpoint call is made to RocksDB that creates an immutable checkpoint directory in the container filesystem (shown in step 1a). We also generate a checkpoint ID (last 6 digits of epoch timestamp in nanoseconds) and build an in-memory data structure called checkpoint to hold checkpoint ID and relevant metadata.
  • Step 2:  We write the checkpoint ID to the checkpoint directory. The checkpoint ID in checkpoint directory helps us in verifying if the local checkpoint directory is the latest checkpoint when the container restarts with host-affinity by verifying if the checkpoint ID in the checkpoint directory and in the checkpoint kafka topic are the same or not. This allows us to skip remote restoration and use the local checkpoint instead. 
  • Step 3 to step 6: We calculate the delta of this checkpoint directory from the previous checkpoint and upload this delta to the blob store in parallel to processing. Blob store returns a blob ID for each upload. We create a special index blob in the blob store with these blob IDs (and other metadata) called Index blob. Schema of Index Blob ID is shown in the code block below. Notice the filesRemoved and subDirectories removed sections. They are maintained for Garbage collection and error recovery. This is explained in detail in the design doc attached with the Jira ticket.
  • Step 6 and 7: We write the blob ID of the index blob to the checkpoint data structure, serialize this data structure and write the serialized checkpoint to the checkpoint topic. 

At this point the commit is complete. We then perform post commit clean-up. 

  • Files are not actually deleted from the hosts until the commit is complete. They are tracked in the filesRemoved and subDirsRemoved section of the Index Blob. At this point, we can safely delete the files and sub-directories marked to be deleted, and clean up the host. This is because if the commit fails, we do not want to delete files and not be able to recover to last known good state of the store.
  • We also update the expiration of the blobs on the remote blob store to never expire from the default 30 day expiration. This ensures that if a job fails in the middle of the commit and never restarts, the blobs are automatically garbage collected by the Blob Store at the end of 30days.

The steps explained above are also performed at the container starts. Since they are idempotent operations, they would have no effect if the post-commit steps were performed right after the commit. If the post-commit steps failed and the container restarts, we perform them immediately and ensure the cleanup operations are successful. 


IndexBlob Schema
{
    "schemaVersion": "Version of current schema", 
    "checkpointId": "Checkpoint ID", 
    "createdTimeMs": "Created time in epoch milliseconds", 
    "jobName": "Job name", 
    "jobInstance": "Instance ID of the job", 
    "taskName": "Name of the task", 
    "storeName": "Store name", 
    "dirIndex": {
        "filesPresent": [
            {
                "fileName": "Name of the file uploaded to Ambry", 
                "blobs": [
                    {
                        "blobID": "Blob ID", 
                        "offset": "File chunk offset [optional]"
                    }
                ], 
                "mtime": "Last updated time in epoch milliseconds", 
                "ctime": "Created time in epoch milliseconds",
                "sizeInBytes": "Size of the file", 
                "crc": "crc32 checksum of the file", 
                "attribs": "permission attributes of the file"
            }
        ],
        "filesRemoved": [
            {
                "fileName": "Name of the file to delete from Ambry", 
                "blobs": [
                    {
                        "blobID": "Blob ID", 
                        "offset": "File chunk offset [optional]"
                    }
                ]
            }
        ],
        "subDirectoriesPresent": [
            {
                "dirIndex": {
                     ……
                 }
            }
        ],
        "subDirectoriesRemoved": [
            {
                "dirIndex": {
                     ……
                 }
            }
        ]
    }, 
  "prevSnapshotIndexBlobId": “Blob ID of previous snapshot index”
}


Public Interfaces


BlobStoreManager Interface
/**
 * Provides interface for common blob store operations: GET, PUT and DELETE
 */
@InterfaceStability.Unstable
public interface BlobStoreManager {
  /**
   * Initialize underlying blob store client, if necessary.
   *
   */
  void init();

  /**
   * Non-blocking PUT call to remote blob store with supplied metadata
   * @param inputStream InputStream to read the file
   * @param metadata user supplied {@link Metadata} of the request
   * @return a future containing the blob ID of the uploaded blob if the upload is successful.
   */
  CompletionStage<String> put(InputStream inputStream, Metadata metadata);

  /**
   * Non-blocking GET call to remote blob store
   * @param id Blob ID of the blob to get
   * @param outputStream OutputStream to write the downloaded blob
   * @param metadata User supplied {@link Metadata} of the request
   * @return A future that completes when all the chunks are downloaded and written successfully to the OutputStream
   * @throws org.apache.samza.storage.blobstore.exceptions.DeletedException returned future should complete
   *         exceptionally with DeletedException on failure with the blob already deleted error.
   */
  CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata);

  /**
   * Non-blocking call to mark a blob for deletion in the remote blob store
   * @param id Blob ID of the blob to delete
   * @param metadata User supplied {@link Metadata} of the request
   * @return A future that completes when the blob is successfully deleted from the blob store.
   * @throws org.apache.samza.storage.blobstore.exceptions.DeletedException returned future should complete
   *         exceptionally with DeletedException on failure with the blob already deleted error. This exception is
   *         caught and ignored by the caller of the delete method during initial cleanup and SnapshotIndex read.
   */
  CompletionStage<Void> delete(String id, Metadata metadata);

  /**
   * Non-blocking call to remove the Time-To-Live (TTL) for a blob and make it permanent.
   * @param blobId Blob ID of blob to remove TTL for.
   * @param metadata User supplied {@link Metadata} of the request
   * @return a future that completes when the TTL for the blob is removed.
   * @throws org.apache.samza.storage.blobstore.exceptions.DeletedException returned future should complete
   *         exceptionally with DeletedException on failure with the blob already deleted error.
   */
  CompletionStage<Void> removeTTL(String blobId, Metadata metadata);

  /**
   * Cleanly close resources like blob store client
   */
  void close();
}
BlobStoreAdminFactory
/**
 * Factory to create instance of {@link StateBackendAdmin}s that needs to be implemented for every
 * state backend
 */
public interface BlobStoreAdminFactory {
  /**
   * Returns an instance of {@link StateBackendAdmin}
   * @param config job configuration
   * @param jobModel Job Model
   */
  StateBackendAdmin getStateBackendAdmin(Config config, JobModel jobModel);
}
BlobStoreManagerFactory
/**
 * Factory to create instance of {@link BlobStoreManager}s that needs to be implemented for every state backend 
 */
public interface BlobStoreManagerFactory {
  /**
   * Returns an instance of {@link BlobStoreManager} for backup. 
   * @param config job configuration.
   * @param backupExecutor excutor service for backup operation.
   */
  BlobStoreManager getBackupBlobStoreManager(Config config, ExecutorService backupExecutor);
  
  /**
   * Returns an instance of {@link BlobStoreManager} for restore. 
   * @param config job configuration.
   * @param restoreExecutor excutor service for restore operation.
   */
  BlobStoreManager getRestoreBlobStoreManager(Config config, ExecutorService restoreExecutor);
}
DirIndex
/**
 * Representation of a directory in the blob store
 */
public class DirIndex {
  public static final String ROOT_DIR_NAME = "";
  private static final short SCHEMA_VERSION = 1;

  private final String dirName;

  private final List<FileIndex> filesPresent;
  private final List<FileIndex> filesRemoved;

  // Note: subDirsPresent can also have filesRemoved and subDirsRemoved within them.
  private final List<DirIndex> subDirsPresent;
  private final List<DirIndex> subDirsRemoved;
}
FileIndex
/**
 * Representation of a file in blob store
 */
public class FileIndex {
  private final String fileName;
  /**
   * Chunks of file uploaded to blob store as {@link FileBlob}s
   */
  private final List<FileBlob> fileBlobs;
  /**
   * Metadata (e.g. POSIX file attributes) associated with the file.
   */
  private final FileMetadata fileMetadata;
  /**
   * Checksum of the file for verifying integrity.
   */
  private final long checksum;
}


FileBlob
/**
 * Representation of a File in a Blob store
 */
public class FileBlob {

  private final String blobId;
  /**
   * Offset of this blob in the file. A file can be uploaded multiple chunks, and can have
   * multiple blobs associated with it. Each blob then has its own ID and an offset in the file.
   */
  private final int offset;
}
SnapshotIndex
/**
 * A {@link SnapshotIndex} contains all the information necessary for recreating the local store by
 * downloading its contents from the remote blob store. The {@link SnapshotIndex} is itself serialized
 * and stored as a blob in the remote store, and its blob id tracked in the Task checkpoint.
 */
public class SnapshotIndex {
  private static final short SCHEMA_VERSION = 1;

  private final long creationTimeMillis;
  /**
   * Metadata for a snapshot like job name, job Id, store name etc.
   */
  private final SnapshotMetadata snapshotMetadata;
  private final DirIndex dirIndex;

  /**
   * Blob ID of previous snapshot index blob. Tracked here to be cleaned up
   * in cleanup phase of commit lifecycle.
   */
  private final Optional<String> prevSnapshotIndexBlobId;
}
DirDiff
/**
 * Representation of the diff between a local directory and a remote directory contents.
 */
public class DirDiff {

  private final String dirName;

  /**
   * New files in this directory that needs to be uploaded to the blob store.
   */
  private final List<File> filesAdded;

  /**
   * Files that have already been uploaded to the blob store in a previous snapshot and haven't changed.
   */
  private final List<FileIndex> filesRetained;

  /**
   * Files that have already been uploaded to the blob store in a previous snapshot and need to be removed.
   */
  private final List<FileIndex> filesRemoved;

  /**
   * Subdirectories of this directory that are not already present in the previous snapshot and all of their contents
   * need to be recursively added.
   */
  private final List<DirDiff> subDirsAdded;

  /**
   * Subdirectories of this directory that are already present in the previous snapshot, but whose contents
   * may have changed and may need to be recursively added or removed.
   */
  private final List<DirDiff> subDirsRetained;

  /**
   * Subdirectories that are already present in the previous snapshot, but don't exist in the local snapshot,
   * and hence all of their contents need to be recursively removed.
   */
  private final List<DirIndex> subDirsRemoved;
}

Implementation and Test Plan


Test ScopeTest Scenarios
Flows to test
  1. Backup of a new store
  2. Restore on a new host
  3. Incremental backup/restore of a store
  4. Cleanup for a new container/job 
  5. Cleanup called after a failed container restart
  6. Cleanup called during commit sequence
  7. Partial failure of backup in different stages of commit lifecycle (commit, upload, persist, cleanup).
    1. Ensure that the partial state is cleaned up.
  8. Failure scenarios of restore in different stages of commit lifecycle (commit, upload, persist, cleanup).
    1. Ensure that a container restarting on the same host after partial failure can cleanup/recover and restore correctly (does not get stuck).
  9. Failure scenarios of blob store (missing index blobs, file blobs, retry etc.)
  10. Test cleanup/commit sequence during taskinstance init works with transactional kafka and blob store
End-to-end testing
  1. Test that checkpoint read/write version configurations are validated before job launch.
  2. Test upgrade and rollback compatibility for samza versions with and without blob store backend.
  3. How do we enforce that backup and restore managers aren't both enabled for the first deployment?
BlobStoreTaskBackupManager


  1. init 

    1. init with checkpoint V1

    2. init with no/null checkpoint 

    3. Test init cleans up unused stores correctly.

  2. upload

    1. No previous checkpoint (first upload)

    2. Previous checkpoint passed during init (subsequent upload)

    3. Test upload handles logged / non-logged / persistent / durable stores correctly (document expectation here). 

    4. Test upload calculates diff from previous checkpoint correctly (during initial start and during post-startup commits)

    5. Test upload returns snapshot blob id and records previous snapshot blob id in the snapshot correctly.

  3. cleanup

    1. Test Cleanup removes TTL of remote snapshot and associated files

    2. Test Cleanup deletes old remote snapshot

    3. Test Cleanup deletes files/subdirs to remove from current checkpoint

    4. Test Cleanup cleans stores removed from config

    5. Cleanup failed container/job restart

BlobStoreRestoreManager
  1. init

    1. Test init fails for checkpoint V1.

    2. Test init works for no/null checkpoint.

    3. Test init returns blob store backend store scms if present in checkpoint.

    4. Test util method to get snapshot indexes from checkpoint.

    5. Test container fails to start with meaningful error message if init fails.

  2. restore

    1. Test that restore restore to the correct store directory depending on store type.

    1. Test that it ignores any files that are not present when upload is called (e.g. offset files).

    1. Test restore handles logged / non-logged / durable / persistent stores correctly.

    2. Test logic for checking if checkpoint directory is identical to remote snapshot.

    3. Test restore handles stores with missing SCM in checkpoint correctly.

    4. Test restore handles multiple stores correctly.

    5. Test restore always deletes main store dir.

    6. Test restore uses previous checkpoint directory if identical to remote snapshot.

    7. Test restore restores from remote snapshot if no previous checkpoint dir.

    8. Test restore restores from remote snapshot if checkpoint dir not identical to remote snapshot.

    9. Test restore recreates subdirs correctly.

    10. Test restore recreates recursive subdirs correctly

    11. Test restore creates empty files correctly.

    12. Test restore creates empty dirs correctly.

    13. Test restore creates empty sub-dirs / recursive subdirs correctly.

    14. Test restore restores multi-part file contents completely and in correct order.

    15. Test restore verifies checksum for files restored if enabled.

BlobStoreStateBackendUtil
  1. Test throws exception for checkpoint v1.

  2. Test no-op for null / empty checkpoint.

  3. Test works correctly for missing blob store backend factory entry.

  4. Test works correctly for missing blob store backend factory store entry.

  5. Test throws exception on sync and async blob store errors.

  6. Test gets the right blobid from remote store.

  7. Test returns the correct pair of scm and snapshot index.

  8. Test blocks once at the end for all futures instead of blocking for each store.

Concurrency and Retries
  1. Test CompletableFutureUtil methods.
  2. Test that all operations use an explicit and expected executor (no default executor).
  3. Verify future composition (allOf, toMap etc) and blocking (individual vs collected vs nonblocking) for all async methods.
  4. Verify that there is no blocking on caller threads. Document and justify exceptions (e.g. restore thread)
  5. Test BlobStoreManager Impl/BlobStoreUtil error handling and retries.
    1. Test completionexception unwrapping to identify actual cause.
    1. Test callback order (par/seq dep graph) for all chained operations.
    2. Test async retriable exceptions are transformed correctly.
    3. Test/verify put / get / delete futures always complete (handle sync / async errors correctly).
    4. Test retries for get / put create new input / output streams.
    5. Test error handling for get (sync, future, callback errors).
  6. Test TaskInstance commit flow.
    1. Test async commit stage fails if upload/checkpoint write/cleanup fails.
    1. Verify all async stage operations execute on a separate threadpool.
    2. Test async commit succeeds and ublocks future commits if all async operations succeed.
    3. Test async commit stage fails if any async operations failed.
    4. Verify async commit stage operations are chained correctly.
    5. Test exceptions in asyc commit stage are propagated to next sync commit stage.
    6. Test sync commit fails if a previous async commit fails.
    7. Test commit skips if previous async commit in progress and < max delay.
    8. Test commit blocks if previous async commit in progress and > max delay
    9. Test that sync commit times out if previous async commit does not complete within max commit delay.
  7. Test BackupManager/RestoreManager flow
    1. Test all async stage operations execute on a separate threadpool.
    2. Verify/Test error propagation, handling and operation chaining (par/seq dep graph).
    3. Verify/Test timeouts for blocking operations. Document and justify blocking operations.
    4. Test handling of retriable / ignorable (410s) / unrecoverable errors.
    5. Verify/Test idempotency of cleanup / delete / ttl operations.

Compatibility, Deprecation, and Migration Plan

The change is backward incompatible once completely enabled. The feature would be enabled through a major release of Samza. We plan to rollout the feature in following steps:

  1. Dual-commit: Internally at LinkedIn, we are planning to enable dual commit at the start for a very small set of customers/jobs. The stores can be chosen based on characteristics such as per-container and per-task state, number of stores, number of tasks, commit-ns etc. Every commit would be done twice. One commit would be done to the Kafka changelog topic, as is done today, and the other would be done to the Blob Store based storage.
  2. Restore from the Blob Store: The restore would be performed through the Blob Store. The restore target (Blob Store or Kafka changelog topic) is chosen through a config value task.restore.manager. The changes are backward compatible and if a restore fails, can be reverted back to kafka based restore by simply switching the configuration value.
  3. Disable dual-commit: After a few redefined count of successful restores from Blob Store, we would disable dual-commit and only backup and restore from the Blob Store. A config value task.commit.manager would enable commit and restore from the Blob Store alone. At this point, the changes are backward incompatible and a failure may require rollback. A rollback can be done by switching the config task.restore.manager and task.commit.manager.


  • No labels