Status
Current state: UNDER DISCUSSION
Discussion thread: <link to mailing list DISCUSS thread>
JIRA: SAMZA-2591
Released:
Motivation
State restore is one of the most significant bottlenecks for Samza jobs to get back online since it requires incremental updates from a Kafka changelog topic (if the host the job started on does not already have the state via host affinity or standby containers). In order to decrease the state restore time, Samza must be able to use a blob store or a distributed file system for state backup, such as Azure blob store or HDFS, which will enable bulk restore of the local state store rather than incremental restore from the Kafka changelog. However, using such a remote store will increase the expected time for uploading the local RockDB changes during commit because of larger upload latency. Since we are uploading the delta to the remote store, we can expect an especially increased delta if a RocksDB compaction takes place, since it will cause the previous sstable files to be reshuffled and remerged into new files. Because Samza commit is exclusive with processing by default, making it asynchronous by default is required to introduce faster state restores without degrading processing performance for existing Samza jobs.
Although there is a current implementation for “Async commit”, the feature is only for enabling commit under special cases when asynchronous processing is enabled, rather than allow commit to occur asynchronously from processing and is therefore not what we need for this problem
Problem
The purpose of this enhancement proposal is to accommodate local state restore in Samza from sources other than a changelog based restore, more specifically to accommodate remote blob stores. Currently, the framework only supports incremental writes to a Kafka changelog as a remote state restore option which results in fast commit time but slow restore times. There are 2 problems arises when we migrate the framework to a remote store based backup:
- Slow backups: To add support for remote stores, the commit phase of the Runloop must be changed to support both remote store commits as well. To counteract the larger amount of data needed to be written to the remote store, we enable the commit phase to occur concurrently to processing.
- Backwards compatibility: the new commit scheme will have the ability to commit to both changelog and remote store for the same commit.
- Transactional State Checkpointing: The framework does not support transactional state checkpointing for remote stores as the current format is only intended for changelog checkpoints.
Proposed Changes
Checkpoint Format changes
With the upcoming changes to the commit lifecycle to support storage systems other than changelog, we must introduce a new checkpointing format to reflect the flexibility of the stored checkpoint data. We must:
- Distinguish the input checkpoint from the state checkpoints
- State checkpoint must support both Kafka changelog state checkpoints and Ambry state checkpoints
- Feature must be backwards compatible with previous checkpoints, and must be able to rollback
Instead of writing the new checkpoint message format to a new checkpoint topic (v2), we are writing to the same topic under a different namespace of the checkpoint topic (v1) is also possible for the migration. The checkpoint manager will selectively pull the checkpoints from the checkpoint topic which will match the key of the written checkpoint. The advantage of this approach is that the migration will no longer require creating a new Kafka topic. However, since we are planning to enable this for all users, the versions that are currently being used, could not have forward compatibility available for being able to read the new checkpoint format from the old checkpoint stream.
State Backup and Commit Interface Changes
In general, in order to accommodate for async commits, we will be splitting the commit phase into 2 sections in order to separate the async and sync operations:
- Sync Snapshot: Section that is exclusive with processing but occurs quickly. Responsible for PREPARING FOR UPLOAD current local state (Example: taking the delta-snapshot of the local store and store the offsets)
- Async Upload: Section that occurs concurrently with processing which performs the WRITING to remote state (Example: upload of the delta-snapshot to remote DB and write to the checkpoint topic)
Furthermore to address exclusive, non-overlapping commits, as well as the possible case where state upload takes too long, the new “max.commit.delay” configuration will be introduced with a reasonable default to determine the behavior of system when the “Upload” section exceeds “task.commit.ms” during a commit, ie a second commit is triggered before all 2 sections of the last commit completed:
- If Upload section exceeds and total time spent in the Upload section <= max.commit.delay, then we skip the current commit and allow the ongoing commit in the Upload section to finish
- If Upload section exceeds and total time spent in the Upload section > max.commit.delay, then we block the new commit as well as any processing until the previous commit in the Upload section is finished
Finally to achieve the same transactional guarantees as the current transactional state checkpoint model, we must save the remote backup store “offsets” to the local disk and the checkpoint stream during the task commit as well, so that the restore process may map a specific on disk version to a remote version of the state store. The “offsets” in the remote store must be a unique identifier to the version of the remote store, in the case of a blob store, this will be a Blob id which is obtained from the store after a successful upload.
Public Interfaces
Implementation and Test Plan
List of items to validate:
- Serde of checkpoints v2: successfully reads and writes v2 checkpoints
- Backwards compatibility for checkpoints v2: can read and deserialize v1 checkpoints when “readV2Checkpoints” are disabled
- Forwards compatibility for checkpoints v2: successfully skips new read checkpoints v2 with different keys on previous versions samza
- Dual checkpoints write: v2 writes both the v1 checkpoint message and v2 checkpoint message must be present in to checkpoint topic
- Test switching readCheckpointV2 enabled config: could switch between v2 and v1 checkpoint read on the latest version via config in new version
Compatibility, Deprecation, and Migration Plan
Checkpoint V2 Migration Plan
The rollout occur in the following steps:
- Create new checkpoint topic with a new version number (v2), using the new schema (Enabled via config)
- Write checkpoints to both v1 and v2 checkpoints, but read only from the v1 checkpoint
- Enable job to restore from the new checkpoint topic (via config)
- Ensure that the job is stable with the new checkpoint scheme on new topic
- Stop write to old checkpoint topic and delete old checkpoint topic (Cannot roll back at this point)
For step 1), all the new checkpoint topics (v2) will be using the samza-internal1 cluster rather than queuing.
For step 5), this may be running for up to a quarter before the checkpoint could be deleted to ensure stability, since step 4 cannot be rolled back.