Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languageyml
firstline1
titleIndexBlob Schema
linenumberstrue
collapsetrue
{
    "schemaVersion": "Version of current schema", 
    "checkpointId": "Checkpoint ID", 
    "createdTimeMs": "Created time in epoch milliseconds", 
    "jobName": "Job name", 
    "jobInstance": "Instance ID of the job", 
    "taskName": "Name of the task", 
    "storeName": "Store name", 
    "dirIndex": {
        "filesPresent": [
            {
                "fileName": "Name of the file uploaded to Ambry", 
                "blobs": [
                    {
                        "blobID": "Blob ID", 
                        "offset": "File chunk offset [optional]"
                    }
                ], 
                "mtime": "Last updated time in epoch milliseconds", 
                "ctime": "Created time in epoch milliseconds",
                "sizeInBytes": "Size of the file", 
                "crc": "crc32 checksum of the file", 
                "attribs": "permission attributes of the file"
            }
        ],
        "filesRemoved": [
            {
                "fileName": "Name of the file to delete from Ambry", 
                "blobs": [
                    {
                        "blobID": "Blob ID", 
                        "offset": "File chunk offset [optional]"
                    }
                ]
            }
        ],
        "subDirectoriesPresent": [
            {
                "dirIndex": {
                     ……
                 }
            }
        ],
        "subDirectoriesRemoved": [
            {
                "dirIndex": {
                     ……
                 }
            }
        ]
    }, 
  "prevSnapshotIndexBlobId": “Blob ID of previous snapshot index”
}


Public Interfaces

Blob Store Manager: Provides interface for general BlobStore operations like GET, PUT and DELETE. We also added a removeTTL interface that is used in post-cleanup operation to make a blob never expire from the default 30 day expiration.


Code Block
languagejava
firstline1
titleBlobStoreManager Interface
linenumberstrue
collapsetrue
/**
 * Provides interface for common blob store operations: GET, PUT and DELETE
 */
@InterfaceStability.Unstable
public interface BlobStoreManager {
  /**
   * Initialize underlying blob store client, if necessary.
   *
   */
  void init();

  /**
   * Non-blocking PUT call to remote blob store with supplied metadata
   * @param inputStream InputStream to read the file
   * @param metadata user supplied {@link Metadata} of the request
   * @return a future containing the blob ID of the uploaded blob if the upload is successful.
   */
  CompletionStage<String> put(InputStream inputStream, Metadata metadata);

  /**
   * Non-blocking GET call to remote blob store
   * @param id Blob ID of the blob to get
   * @param outputStream OutputStream to write the downloaded blob
   * @param metadata User supplied {@link Metadata} of the request
   * @return A future that completes when all the chunks are downloaded and written successfully to the OutputStream
   * @throws org.apache.samza.storage.blobstore.exceptions.DeletedException returned future should complete
   *         exceptionally with DeletedException on failure with the blob already deleted error.
   */
  CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata);

  /**
   * Non-blocking call to mark a blob for deletion in the remote blob store
   * @param id Blob ID of the blob to delete
   * @param metadata User supplied {@link Metadata} of the request
   * @return A future that completes when the blob is successfully deleted from the blob store.
   * @throws org.apache.samza.storage.blobstore.exceptions.DeletedException returned future should complete
   *         exceptionally with DeletedException on failure with the blob already deleted error. This exception is
   *         caught and ignored by the caller of the delete method during initial cleanup and SnapshotIndex read.
   */
  CompletionStage<Void> delete(String id, Metadata metadata);

  /**
   * Non-blocking call to remove the Time-To-Live (TTL) for a blob and make it permanent.
   * @param blobId Blob ID of blob to remove TTL for.
   * @param metadata User supplied {@link Metadata} of the request
   * @return a future that completes when the TTL for the blob is removed.
   * @throws org.apache.samza.storage.blobstore.exceptions.DeletedException returned future should complete
   *         exceptionally with DeletedException on failure with the blob already deleted error.
   */
  CompletionStage<Void> removeTTL(String blobId, Metadata metadata);

  /**
   * Cleanly close resources like blob store client
   */
  void close();
}

...

Implementation and Test Plan

PhaseFeature
Phase 1

Phase 2


Compatibility, Deprecation, and Migration Plan

...

The feature would be enabled through a major release of Samza. We plan to rollout the feature in following steps:

  1. Dual-commit: Internally at LinkedIn, we are planning to enable dual commit at the start for a very small set of customers/jobs. The stores can be chosen based on characteristics such as per-container and per-task state, number of stores, number of tasks, commit-ns etc. Every commit would be done twice. One commit would be done to the Kafka changelog topic, as is done today, and the other would be done to the Blob Store based storage.
  2. Restore from the Blob Store: The restore would be performed through the Blob Store. The restore target (Blob Store or Kafka changelog topic) is chosen through a config value `task.restore.manager`. The changes are backward compatible and if a restore fails, can be reverted back to kafka based restore by simply switching the configuration value.
  3. Disable dual-commit: After a few redefined count of successful restores from Blob Store, we would disable dual-commit and only backup and restore from the Blob Store. A config value `task.commit.manager` would enable commit and restore from the Blob Store alone. At this point, the changes are backward incompatible and a failure may require rollback. A rollback can be done by switching the config `task.restore.manager` and `task.commit.manager.`