Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA: SAMZA-2591

Released: 

Problem

The purpose of this enhancement proposal is to accommodate local state restore in Samza from sources other than a changelog based restore, more specifically to accommodate remote stores. Currently the framework only supports incremental writes to the Kafka changelog as a remote restore option which results in fast commit time but slow restore times. To add support for remote stores, the commit phase of the Runloop must be changed to support both remote store commits as well. To counteract the larger amount of data needed to be written to the remote store, we enable the commit phase to occur concurrently to processing. This design introduces new state restore APIs with TaskCommitManager and TaskBackupManager to split the commit for the Samza task into a sync and async phase, with up to 64 sync commit threads. Furthermore, for backwards compatibility, the new TaskCommitManager will have the ability to commit to both changelog and remote store for the same commit via Dual commit.

Motivation

State restore is one of the most significant bottlenecks for Samza jobs to get back online since it requires incremental updates from a Kafka changelog topic (if the host the job started on does not already have the state via host affinity or standby containers). In order to decrease the state restore time, Samza must be able to use a blob store or a distributed file system for state backup, such as Ambry or HDFS, which will enable bulk restore of the local state store rather than incremental restore from the Kafka changelog. However, using such a remote store will increase the expected time for uploading the local RockDB changes during commit because of larger upload latency. Since we are uploading the delta to the remote store, we can expect an especially increased delta if a RocksDB compaction takes place, since it will cause the previous sstable files to be reshuffled and remerged into new files. Because Samza commit is exclusive with processing by default, making it asynchronous by default is required to introduce faster state restores without degrading processing performance for existing Samza jobs. 

...

Motivation

State restore is one of the most significant bottlenecks for Samza jobs to get back online since it requires incremental updates from a Kafka changelog topic (if the host the job started on does not already have the state via host affinity or standby containers). In order to decrease the state restore time, Samza must be able to use a blob store or a distributed file system for state backup, such as Azure blob store or HDFS, which will enable bulk restore of the local state store rather than incremental restore from the Kafka changelog. However, using such a remote store will increase the expected time for uploading the local RockDB changes during commit because of larger upload latency. Since we are uploading the delta to the remote store, we can expect an especially increased delta if a RocksDB compaction takes place, since it will cause the previous sstable files to be reshuffled and remerged into new files. Because Samza commit is exclusive with processing by default, making it asynchronous by default is required to introduce faster state restores without degrading processing performance for existing Samza jobs. 

Although there is a current implementation for “Async commit”, the feature is only for enabling commit under special cases when asynchronous processing is enabled, rather than allow commit to occur asynchronously from processing and is therefore not what we need for this problem

Problem

The purpose of this enhancement proposal is to accommodate local state restore in Samza from sources other than a changelog based restore, more specifically to accommodate remote blob stores. Currently, the framework only supports incremental writes to a Kafka changelog as a remote state restore option which results in fast commit time but slow restore times. There are 2 problems arises when we migrate the framework to a remote store based backup:

  1. Slow backups: To add support for remote stores, the commit phase of the Runloop must be changed to support both remote store commits as well. To counteract the larger amount of data needed to be written to the remote store, we enable the commit phase to occur concurrently to processing.
  2. Backwards compatibility: the new commit scheme will have the ability to commit to both changelog and remote store for the same commit. 
  3. Transactional State Checkpointing: The framework does not support transactional state checkpointing for remote stores as the current format is only intended for changelog checkpoints.

Proposed Changes

Checkpoint Format changes

...