Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Samza currently backs up local state of jobs to compacted Kafka topics called changelogs. At LinkedIn, we observed that for For jobs with large local state, the restore can be very slow and can run in the tune of multiple hours. The effect of slow restores is that a job restarting due to hardware failure, regular maintenance like patching and upgrades, or autoscaling events can cause disruptions while the state is rebuilt. 

...

  1. Long restore times: State restore from Kafka backed changelog topic can be slow because the restore happens at "one message at a time" cadence. With Blob Store as state restore backend, it is possible to restore state store in bulk, thereby reducing the restore times from hours to the order of minutes.
  2. Automate Job and Cluster maintenance: Slow restore times can also hinders the ability to automate job and cluster maintenance. Since restoring state from the changelog topic can take a long time, automated tasks like auto-scaling job, OS upgrades, hardware upgrades etc. need to be planned more carefully and cannot be completely automated. With a fast restore from Blob Store based restore, it is possible to reduce the duration and impact of such maintenance tasks. 
  3. Message and store size limits: Moving to a Blob Store backed storage would allow us to write messages of arbitrary size and expand the store to any size.

...

  • Step 1: A synchronous checkpoint call is made to RocksDB that creates an immutable checkpoint directory in the container filesystem (shown in step 1a). We also generate a checkpoint ID (last 6 digits of epoch timestamp in nanoseconds) and build an in-memory data structure called checkpoint to hold checkpoint ID and relevant metadata.
  • Step 2:  We write the checkpoint ID to the checkpoint directory. The checkpoint ID in checkpoint directory helps us in verifying if the local checkpoint directory is the latest checkpoint when the container restarts with host-affinity by verifying if the checkpoint ID in the checkpoint directory and in the checkpoint kafka topic are the same or not. This allows us to skip remote restoration and use the local checkpoint instead. 
  • Step 3 to step 6: We calculate the delta of this checkpoint directory from the previous checkpoint and upload this delta to the blob store in parallel to processing. Blob store returns a blob ID for each upload. We create a special index blob in the blob store with these blob IDs (and other metadata) called Index blob. Schema of Index Blob ID is shown in the code block below. Notice the filesRemoved and subDirectories removed sections. They are maintained for Garbage collection and error recovery. This is explained in detail in the design doc attached with the Jira ticketnext section: Post-commit cleanup and Garbage Collection.
  • Step 6 and 7: We write the blob ID of the index blob to the checkpoint data structure, serialize this data structure and write the serialized checkpoint to the checkpoint topic. 

At this point the commit is complete. We then perform post commit clean-up. 

  • Files are not actually deleted from the hosts until the commit is complete. They are tracked in the filesRemoved and subDirsRemoved section of the Index Blob. At this point, we can safely delete the files and sub-directories marked to be deleted, and clean up the host. This is because if the commit fails, we do not want to delete files and not be able to recover to last known good state of the store.
  • We also update the expiration of the blobs on the remote blob store to never expire from the default 30 day expiration. This ensures that if a job fails in the middle of the commit and never restarts, the blobs are automatically garbage collected by the Blob Store at the end of 30days.

...

Organization of snapshot in the Blob Store

Files are stored and organized in the Blob Store as immutable blobs. Each blob is assigned a unique blob ID by the Blob Store. The way we organize a checkpoint directory is as follows:

  1. All the files corresponding to a checkpoint are uploaded in parallel to the Blob Store. Blob Store returns a blob ID associated with every file. Note that large files (multiple GBs) can be chunked and uploaded, and can have one blob ID per chunk. Thus, a file has one or more blob IDs associated with it. 
  2. We organize and index the blobs associated with a checkpoint by creating an Index blob in the Blob Store that holds file name, blob IDs and metadata of all the files associated with a checkpoint. Schema defined below shows the fields in index blob. Note: The filesRemoved section in the schema helps in garbage collection. Please see the next section on Garbage collection for details.
  3. Retrieving an Index blob is necessary and sufficient to rebuild a snapshot since the Index blob contains blob ID and metadata of every file associated with a checkpoint directory.   


Code Block
languageyml
firstline1
titleIndexBlob Schema
linenumberstrue
collapsetrue
{
    "schemaVersion": "Version of current schema", 
    "checkpointId": "Checkpoint ID", 
    "createdTimeMs": "Created time in epoch milliseconds", 
    "jobName": "Job name", 
    "jobInstance": "Instance ID of the job", 
    "taskName": "Name of the task", 
    "storeName": "Store name", 
    "dirIndex": {
        "filesPresent": [
            {
                "fileName": "Name of the file uploaded to Ambry", 
                "blobs": [
                    {
                        "blobID": "Blob ID", 
                        "offset": "File chunk offset [optional]"
                    }
                ], 
                "mtime": "Last updated time in epoch milliseconds", 
                "ctime": "Created time in epoch milliseconds",
                "sizeInBytes": "Size of the file", 
                "crc": "crc32 checksum of the file", 
                "attribs": "permission attributes of the file"
            }
        ],
        "filesRemoved": [
            {
                "fileName": "Name of the file to delete from Ambry", 
                "blobs": [
                    {
                        "blobID": "Blob ID", 
                        "offset": "File chunk offset [optional]"
                    }
                ]
            }
        ],
        "subDirectoriesPresent": [
            {
                "dirIndex": {
                     ……
                 }
            }
        ],
        "subDirectoriesRemoved": [
            {
                "dirIndex": {
                     ……
                 }
            }
        ]
    }, 
  "prevSnapshotIndexBlobId": “Blob ID of previous snapshot index”
}
Incremental backup and restore

As described in the previous section, we can retrieve content and metadata of a snapshot using the Index blob. We leverage the Index blob to perform incremental backup and restore, rather than creating a complete checkpoint or restore on every commit. 

On every container start/restart, we first look up for the latest Checkpoint ID in the checkpoint topic. We use this checkpoint ID to get the Index Blob of the last checkpoint written to the Blob Store and cache it. 

Before uploading a checkpoint directory to the Blob Store, we retrieve the cached Index blob and deserialize the blob. This gives us the content of the previous checkpoint uploaded. We then list the files in the local checkpoint directory to get it’s content and metadata. We then compare the previous cached checkpoint and the current checkpoint. As described in the background section, we have two types of files in the checkpoint directory - mutable files and immutable files. For all the immutable files, we compare file names and verify if a) a new file has been added b) if a file previously checkpointed has been deleted. Newly added files are uploaded to the Blob Store. Deleted files are marked for deletion from the Blob Store and added to filesDeleted section of Index blob (details Post-Commit cleanup and Garbage Collection). Mutable files are always uploaded since they are typically only a few kilobytes. 

At the end, using the blob IDs of the newly uploaded and deleted files, we recreate a new Index blob, serialize it and upload it. This concludes our incremental backup.

Incremental restore is just the reverse of the incremental backup outlined above. We consider the Checkpoint topic as our source of truth, and restore the checkpoint directory target by retrieving the Index blob ID from Checkpoint obtained from Checkpoint topic. We then retrieve the Index blob from the Blob Store. Index blob contains lists called filesAdded and filesRemoved. We retrieve the filesAdded blob list from the Blob Store and add them to the checkpoint directory. Similarly, we delete the filesRemoved list from the local checkpoint directory. We also compare the same files in remote and local checkpoint directory and match their checksum, size and permission attributes to see if they are the same file. If a file’s content has changed since the remote checkpoint was created, we restore the file from the remote checkpoint and delete the local file.

Image Added

Post-Commit cleanup and Garbage Collection

A commit can fail at any stage. The container may restart before a commit completes or before the post commit cleanup phase is complete. If a commit fails midway, we may have already created blobs in the blob store  and not made the checkpoint durable in the checkpoint topic. These blobs are part of a failed commit and are considered garbage that needs to be cleaned up. Additionally, if a job fails during the post commit phase, we may not have deleted the blobs corresponding to the files deleted in the new checkpoint. 

The proposed solution handles garbage collection automatically by leveraging TTL and REMOVE_TTL () functionality that we implement.

Blob Stores like Azure and AWS S3 allow every blob to have a TTL associated with it. Blobs are garbage collected at the end of the TTL. Additionally, we implement a method called REMOVE_TTL to allow a blob’s TTL to be updated to never expire. A second call to REMOVE_TTL  on the same blob has no effect and will be a no-op. We use these facts to ensure garbage or untracked blobs are cleaned up in the Blob Store. 

  1. As part of the commit, when we upload a file for checkpointing, we create a blob with a TTL of 30 days. This ensures that if the commit sequence fails at any step, our blobs are automatically garbage collected at the end of TTL. 
  2. Files are not deleted immediately and are kept around until the commit is complete in case they are relevant for rollback or otherwise. Rather, any files to be deleted are added to the filesRemoved section of the Index blob schema as explained in this section
  3. Commit completes after the checkpoint ID is written to the checkpoint topic. At this point, we update all the blobs created in that commit sequence to never expire using a REMOVE_TTL() request to the Blob Store. We also send delete requests at the end of the commit phase.
  4. Whenever a job restores, we perform 2 operations as part of init operation:
    1. Send REMOVE_TTL to never expire all the blobs in the Index blob. 
    2. Send delete requests for all the blobs in the cleanup section of the Index blob. 
  5. Step 4 ensures that if a job fails in the post commit phase, we can reclaim the blobs and they are not garbage collected, and ensures that blobs to be deleted are not left behind as garbage in the Blob Store. Both the operations have no effect if they have successfully completed earlier.


Public Interfaces


Code Block
languagejava
firstline1
titleBlobStoreManager Interface
linenumberstrue
collapsetrue
/**
 * Provides interface for common blob store operations: GET, PUT and DELETE
 */
@InterfaceStability.Unstable
public interface BlobStoreManager {
  /**
   * Initialize underlying blob store client, if necessary.
   *
   */
  void init();

  /**
   * Non-blocking PUT call to remote blob store with supplied metadata
   * @param inputStream InputStream to read the file
   * @param metadata user supplied {@link Metadata} of the request
   * @return a future containing the blob ID of the uploaded blob if the upload is successful.
   */
  CompletionStage<String> put(InputStream inputStream, Metadata metadata);

  /**
   * Non-blocking GET call to remote blob store
   * @param id Blob ID of the blob to get
   * @param outputStream OutputStream to write the downloaded blob
   * @param metadata User supplied {@link Metadata} of the request
   * @return A future that completes when all the chunks are downloaded and written successfully to the OutputStream
   * @throws org.apache.samza.storage.blobstore.exceptions.DeletedException returned future should complete
   *         exceptionally with DeletedException on failure with the blob already deleted error.
   */
  CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata);

  /**
   * Non-blocking call to mark a blob for deletion in the remote blob store
   * @param id Blob ID of the blob to delete
   * @param metadata User supplied {@link Metadata} of the request
   * @return A future that completes when the blob is successfully deleted from the blob store.
   * @throws org.apache.samza.storage.blobstore.exceptions.DeletedException returned future should complete
   *         exceptionally with DeletedException on failure with the blob already deleted error. This exception is
   *         caught and ignored by the caller of the delete method during initial cleanup and SnapshotIndex read.
   */
  CompletionStage<Void> delete(String id, Metadata metadata);

  /**
   * Non-blocking call to remove the Time-To-Live (TTL) for a blob and make it permanent.
   * @param blobId Blob ID of blob to remove TTL for.
   * @param metadata User supplied {@link Metadata} of the request
   * @return a future that completes when the TTL for the blob is removed.
   * @throws org.apache.samza.storage.blobstore.exceptions.DeletedException returned future should complete
   *         exceptionally with DeletedException on failure with the blob already deleted error.
   */
  CompletionStage<Void> removeTTL(String blobId, Metadata metadata);

  /**
   * Cleanly close resources like blob store client
   */
  void close();
}

...