You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state: UNDER DISCUSSION

Discussion thread: <link to mailing list DISCUSS thread>

JIRA Unable to render Jira issues macro, execution error.

Released: 

Problem

Samza currently backs up local state of jobs to compacted Kafka topics called changelogs. At LinkedIn, we observed that for jobs with large local state, the restore can be very slow and can run in the tune of multiple hours. The effect of slow restores is that a job restarting due to hardware failure, regular maintenance like patching and upgrades, or autoscaling events can cause disruptions while the state is rebuilt. 

Motivation

Our motivation of introducing Blob Store as a replacement to Kafka based changelogs is to address the following issues:

  1. Long restore times: State restore from Kafka backed changelog topic is slow because the restore happens at "one message at a time" cadence. With Blob Store as state restore backend, it is possible to restore state store in parallel, thereby reducing the restore times from hours to the order of minutes.
  2. Automate Job and Cluster maintenance: Slow restore times also hinders our ability to automate job and cluster maintenance. Since restoring state from the changelog topic can take a long time, automated tasks like auto-scaling job, OS upgrades, hardware upgrades etc. need to be planned more carefully and cannot be completely automated. With a fast restore from Blob Store based restore, it is possible to reduce the duration and impact of such maintenance tasks. 
  3. Message and store size limits: Kafka has per message (1 MB) and per partition (25 GB) size limit. Moving to a Blob Store backed storage would allow us to write messages of arbitrary size and expand the store size, which currently is bound by the maximum partition size of Kafka.

Background

We utilize some features offered by RocksDB in our proposed design to implement robust state backup and restore. It is useful to understand these concepts related to RocksDB to help understand the proposed solution better. 

RocksDB Store Composition:

RocksDB is the in-memory key-value database that powers Samza's stateful stream processing. RocksDB flushes the in-memory buffer to the disk when it gets filled. It creates 2 types of files on the host:

  1. Mutable Files: Files like OFFSET, MANIFEST, LOCK and OPTIONS are mutable files created by RocksDB. These files contain metadata information about the current state of the store. Additionally, Samza also creates mutable files with additional metadata information about the state. These are: OFFSET-V2 and CURRENT.
  2. Immutable files: RocksDB creates durable copies of memtables in the form of SST files in the store directories. However, SST files are often combined together to create larger (and possibly deduplicated) files and as such can be deleted to create new SST files.

More details about the schema of metadata as well as SST files created by RocksDB can be found in this wiki: RocksDB wiki.

RocksDB Checkpoint:

RocksDB checkpoint is a feature that allows taking a snapshot of the current state of the store to a separate directory. This is used to create a partial or full backup of RocksDB state store. RocksDB creates a checkpoint by creating a hard link of the immutable SST files in the new directory and copying over mutable metadata file like MANIFEST and CURRENT. We leverage the checkpoint feature to create a cheap and fast point in time backup of our state store. The delta between this backup and the previous backup (if one exists) is then uploaded to the Blob Store. This allows us to create an incremental backup of the state store.

A detailed description of RocksDB checkpoint process can be found here: Checkpoints in RocksDB.

Proposed Changes

We propose a Blob Store based state backup and restore for Samza. Following diagram illustrates a high-level view of our solution:

Blob Store backed backup and restore for Samza

Note: Orange represents synchronous/blocking calls. Green represents asynchronous/non blocking operations. 

  • Step 1: A synchronous checkpoint call is made to RocksDB that creates an immutable checkpoint directory in the container filesystem (shown in step 1a). We also generate a checkpoint ID (last 6 digits of epoch timestamp in nanoseconds) and build an in-memory data structure called checkpoint to hold checkpoint ID and relevant metadata.
  • Step 2:  We write the checkpoint ID to the checkpoint directory. The checkpoint ID in checkpoint directory helps us in verifying if the local checkpoint directory is the latest checkpoint when the container restarts with host-affinity by verifying if the checkpoint ID in the checkpoint directory and in the checkpoint kafka topic are the same or not. This allows us to skip remote restoration and use the local checkpoint instead. 
  • Step 3 to step 6: We calculate the delta of this checkpoint directory from the previous checkpoint and upload this delta to the blob store in parallel to processing. Blob store returns a blob ID for each upload. We create a special index blob in the blob store with these blob IDs (and other metadata) called Index blob. Schema of Index Blob ID is explained in detail in the design doc attached with the Jira ticket.
  • Step 6 and 7: We write the blob ID of the index blob to the checkpoint data structure, serialize this data structure and write the serialized checkpoint to the checkpoint topic. 

At this point the commit is complete. We then perform post commit clean-up. 

  • Files are not actually deleted from the hosts until the commit is complete. At this point, we can safely delete the files and directories to be deleted, and clean up the host. 
  • We also update the expiration of the blobs on the remote blob store to never expire from the default 30 day expiration. This ensures that if a job fails in the middle of the commit and never restarts, the blobs are automatically garbage collected by the Blob Store at the end of 30days.

A commit can fail at any stage. To recover from a failure, whenever a container restarts, we first initiate the cleanup. This ensures that the garbage collection is handled properly and no orphan state is left on the host. This also ensures the blobs are made permanent by updating their expiration. More detail can be found in the design document attached with the Jira ticket.

Public Interfaces


Implementation and Test Plan


Compatibility, Deprecation, and Migration Plan


Rejected Alternatives


  • No labels