Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We propose a Blob Store based state backup and restore for Samza. Following diagram illustrates a high-level view of our solution:

Blob Store backed backup and restore for Samza

Note: Orange represents synchronous/blocking calls. Green represents asynchronous/non blocking operations. 

  • Step 1: A synchronous checkpoint call is made to RocksDB that creates an immutable checkpoint directory in the container filesystem (shown in step 1a). We also generate a checkpoint ID (last 6 digits of epoch timestamp in nanoseconds) and build an in-memory data structure called checkpoint to hold checkpoint ID and relevant metadata.
  • Step 2:  We write the checkpoint ID to the checkpoint directory. The checkpoint ID in checkpoint directory helps us in verifying if the local checkpoint directory is the latest checkpoint when the container restarts with host-affinity by verifying if the checkpoint ID in the checkpoint directory and in the checkpoint kafka topic are the same or not. This allows us to skip remote restoration and use the local checkpoint instead. 
  • Step 3 to step 6: We calculate the delta of this checkpoint directory from the previous checkpoint and upload this delta to the blob store in parallel to processing. Blob store returns a blob ID for each upload. We create a special index blob in the blob store with these blob IDs (and other metadata) called Index blob. Schema of Index Blob ID is shown in the code block below. Notice the filesRemoved and subDirectories removed sections. They are maintained for Garbage collection and error recovery. This is explained in detail in the design doc attached with the Jira ticket.
  • Step 6 and 7: We write the blob ID of the index blob to the checkpoint data structure, serialize this data structure and write the serialized checkpoint to the checkpoint topic. 

At this point the commit is complete. We then perform post commit clean-up. 

  • Files are not actually deleted from the hosts until the commit is complete. They are tracked in the filesRemoved and subDirsRemoved section of the Index Blob. At this point, we can safely delete the files and sub-directories marked to be deleted, and clean up the host. This is because if the commit fails, we do not want to delete files and not be able to recover to last known good state of the store.
  • We also update the expiration of the blobs on the remote blob store to never expire from the default 30 day expiration. This ensures that if a job fails in the middle of the commit and never restarts, the blobs are automatically garbage collected by the Blob Store at the end of 30days.

A commit can fail at any stage. To recover from a failure, whenever a container restarts, we first initiate the cleanup. This ensures that the garbage collection is handled properly and no orphan state is left on the host. This also ensures the blobs are made permanent by updating their expiration. More detail can be found in the design document attached with the Jira ticket.

Public Interfaces

...

The steps explained above are also performed at the container starts. Since they are idempotent operations, they would have no effect if the post-commit steps were performed right after the commit. If the post-commit steps failed and the container restarts, we perform them immediately and ensure the cleanup operations are successful. 


Code Block
languageyml
firstline1
titleIndexBlob Schema
linenumberstrue
collapsetrue
{
    "schemaVersion": "Version of current schema", 
    "checkpointId": "Checkpoint ID", 
    "createdTimeMs": "Created time in epoch milliseconds", 
    "jobName": "Job name", 
    "jobInstance": "Instance ID of the job", 
    "taskName": "Name of the task", 
    "storeName": "Store name", 
    "dirIndex": {
        "filesPresent": [
            {
                "fileName": "Name of the file uploaded to Ambry", 
                "blobs": [
                    {
                        "blobID": "Blob ID", 
                        "offset": "File chunk offset [optional]"
                    }
                ], 
                "mtime": "Last updated time in epoch milliseconds", 
                "ctime": "Created time in epoch milliseconds",
                "sizeInBytes": "Size of the file", 
                "crc": "crc32 checksum of the file", 
                "attribs": "permission attributes of the file"
            }
        ],
        "filesRemoved": [
            {
                "fileName": "Name of the file to delete from Ambry", 
                "blobs": [
                    {
                        "blobID": "Blob ID", 
                        "offset": "File chunk offset [optional]"
                    }
                ]
            }
        ],
        "subDirectoriesPresent": [
            {
                "dirIndex": {
                     ……
                 }
            }
        ],
        "subDirectoriesRemoved": [
            {
                "dirIndex": {
                     ……
                 }
            }
        ]
    }, 
  "prevSnapshotIndexBlobId": “Blob ID of previous snapshot index”
}


Public Interfaces

Blob Store Manager: Provides interface for general BlobStore operations like GET, PUT and DELETE. We also added a removeTTL interface that is used in post-cleanup operation to make a blob never expire from the default 30 day expiration.


Code Block
languagejava
firstline1
titleBlobStoreManager Interface
linenumberstrue
collapsetrue
/**
 * Provides interface for common blob store operations: GET, PUT and DELETE
 */
@InterfaceStability.Unstable
public interface BlobStoreManager {
  /**
   * Initialize underlying blob store client, if necessary.
   *
   */
  void init();

  /**
   * Non-blocking PUT call to remote blob store with supplied metadata
   * @param inputStream InputStream to read the file
   * @param metadata user supplied {@link Metadata} of the request
   * @return a future containing the blob ID of the uploaded blob if the upload is successful.
   */
  CompletionStage<String> put(InputStream inputStream, Metadata metadata);

  /**
   * Non-blocking GET call to remote blob store
   * @param id Blob ID of the blob to get
   * @param outputStream OutputStream to write the downloaded blob
   * @param metadata User supplied {@link Metadata} of the request
   * @return A future that completes when all the chunks are downloaded and written successfully to the OutputStream
   * @throws org.apache.samza.storage.blobstore.exceptions.DeletedException returned future should complete
   *         exceptionally with DeletedException on failure with the blob already deleted error.
   */
  CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata);

  /**
   * Non-blocking call to mark a blob for deletion in the remote blob store
   * @param id Blob ID of the blob to delete
   * @param metadata User supplied {@link Metadata} of the request
   * @return A future that completes when the blob is successfully deleted from the blob store.
   * @throws org.apache.samza.storage.blobstore.exceptions.DeletedException returned future should complete
   *         exceptionally with DeletedException on failure with the blob already deleted error. This exception is
   *         caught and ignored by the caller of the delete method during initial cleanup and SnapshotIndex read.
   */
  CompletionStage<Void> delete(String id, Metadata metadata);

  /**
   * Non-blocking call to remove the Time-To-Live (TTL) for a blob and make it permanent.
   * @param blobId Blob ID of blob to remove TTL for.
   * @param metadata User supplied {@link Metadata} of the request
   * @return a future that completes when the TTL for the blob is removed.
   * @throws org.apache.samza.storage.blobstore.exceptions.DeletedException returned future should complete
   *         exceptionally with DeletedException on failure with the blob already deleted error.
   */
  CompletionStage<Void> removeTTL(String blobId, Metadata metadata);

  /**
   * Cleanly close resources like blob store client
   */
  void close();
}


Code Block
languagejava
firstline1
titleBlobStoreAdminFactory
linenumberstrue
collapsetrue
/**
 * Factory to create instance of {@link StateBackendAdmin}s that needs to be implemented for every
 * state backend
 */
public interface BlobStoreAdminFactory {
  /**
   * Returns an instance of {@link StateBackendAdmin}
   * @param config job configuration
   * @param jobModel Job Model
   */
  StateBackendAdmin getStateBackendAdmin(Config config, JobModel jobModel);
}


Code Block
languagejava
firstline1
titleBlobStoreManagerFactory
linenumberstrue
collapsetrue
/**
 * Factory to create instance of {@link BlobStoreManager}s that needs to be implemented for every state backend 
 */
public interface BlobStoreManagerFactory {
  /**
   * Returns an instance of {@link BlobStoreManager} for backup. 
   * @param config job configuration.
   * @param backupExecutor excutor service for backup operation.
   */
  BlobStoreManager getBackupBlobStoreManager(Config config, ExecutorService backupExecutor);
  
  /**
   * Returns an instance of {@link BlobStoreManager} for restore. 
   * @param config job configuration.
   * @param restoreExecutor excutor service for restore operation.
   */
  BlobStoreManager getRestoreBlobStoreManager(Config config, ExecutorService restoreExecutor);
}


Code Block
languagejava
firstline1
titleDirIndex
linenumberstrue
collapsetrue
/**
 * Representation of a directory in the blob store
 */
public class DirIndex {
  public static final String ROOT_DIR_NAME = "";
  private static final short SCHEMA_VERSION = 1;

  private final String dirName;

  private final List<FileIndex> filesPresent;
  private final List<FileIndex> filesRemoved;

  // Note: subDirsPresent can also have filesRemoved and subDirsRemoved within them.
  private final List<DirIndex> subDirsPresent;
  private final List<DirIndex> subDirsRemoved;
}


Code Block
languagejava
firstline1
titleFileIndex
linenumberstrue
collapsetrue
/**
 * Representation of a file in blob store
 */
public class FileIndex {
  private final String fileName;
  /**
   * Chunks of file uploaded to blob store as {@link FileBlob}s
   */
  private final List<FileBlob> fileBlobs;
  /**
   * Metadata (e.g. POSIX file attributes) associated with the file.
   */
  private final FileMetadata fileMetadata;
  /**
   * Checksum of the file for verifying integrity.
   */
  private final long checksum;
}


Code Block
languagejava
firstline1
titleFileBlob
linenumberstrue
collapsetrue
/**
 * Representation of a File in a Blob store
 */
public class FileBlob {

  private final String blobId;
  /**
   * Offset of this blob in the file. A file can be uploaded multiple chunks, and can have
   * multiple blobs associated with it. Each blob then has its own ID and an offset in the file.
   */
  private final int offset;
}


Code Block
languagejava
firstline1
titleSnapshotIndex
linenumberstrue
collapsetrue
/**
 * A {@link SnapshotIndex} contains all the information necessary for recreating the local store by
 * downloading its contents from the remote blob store. The {@link SnapshotIndex} is itself serialized
 * and stored as a blob in the remote store, and its blob id tracked in the Task checkpoint.
 */
public class SnapshotIndex {
  private static final short SCHEMA_VERSION = 1;

  private final long creationTimeMillis;
  /**
   * Metadata for a snapshot like job name, job Id, store name etc.
   */
  private final SnapshotMetadata snapshotMetadata;
  private final DirIndex dirIndex;

  /**
   * Blob ID of previous snapshot index blob. Tracked here to be cleaned up
   * in cleanup phase of commit lifecycle.
   */
  private final Optional<String> prevSnapshotIndexBlobId;
}


Code Block
languagejava
firstline1
titleDirDiff
linenumberstrue
collapsetrue
/**
 * Representation of the diff between a local directory and a remote directory contents.
 */
public class DirDiff {

  private final String dirName;

  /**
   * New files in this directory that needs to be uploaded to the blob store.
   */
  private final List<File> filesAdded;

  /**
   * Files that have already been uploaded to the blob store in a previous snapshot and haven't changed.
   */
  private final List<FileIndex> filesRetained;

  /**
   * Files that have already been uploaded to the blob store in a previous snapshot and need to be removed.
   */
  private final List<FileIndex> filesRemoved;

  /**
   * Subdirectories of this directory that are not already present in the previous snapshot and all of their contents
   * need to be recursively added.
   */
  private final List<DirDiff> subDirsAdded;

  /**
   * Subdirectories of this directory that are already present in the previous snapshot, but whose contents
   * may have changed and may need to be recursively added or removed.
   */
  private final List<DirDiff> subDirsRetained;

  /**
   * Subdirectories that are already present in the previous snapshot, but don't exist in the local snapshot,
   * and hence all of their contents need to be recursively removed.
   */
  private final List<DirIndex> subDirsRemoved;
}

Implementation and Test Plan

PhaseFeature
Phase 1




Compatibility, Deprecation, and Migration Plan

...