You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Status

Current state: UNDER DISCUSSION

Discussion thread: <link to mailing list DISCUSS thread> 

JIRA: SAMZA-2591

Released: 

Motivation

State restore is one of the most significant bottlenecks for Samza jobs to get back online since it requires incremental updates from a Kafka changelog topic (if the host the job started on does not already have the state via host affinity or standby containers). In order to decrease the state restore time, Samza must be able to use a blob store or a distributed file system for state backup, such as Azure blob store or HDFS, which will enable bulk restore of the local state store rather than incremental restore from the Kafka changelog. However, using such a remote store will increase the expected time for uploading the local RockDB changes during commit because of larger upload latency. Since we are uploading the delta to the remote store, we can expect an especially increased delta if a RocksDB compaction takes place, since it will cause the previous sstable files to be reshuffled and remerged into new files. Because Samza commit is exclusive with processing by default, making it asynchronous by default is required to introduce faster state restores without degrading processing performance for existing Samza jobs. 

Although there is a current implementation for “Async commit”, the feature is only for enabling commit under special cases when asynchronous processing is enabled, rather than allow commit to occur asynchronously from processing and is therefore not what we need for this problem

Problem

The purpose of this enhancement proposal is to accommodate local state restore in Samza from sources other than a changelog based restore, more specifically to accommodate remote blob stores. Currently, the framework only supports incremental writes to a Kafka changelog as a remote state restore option which results in fast commit time but slow restore times. There are 2 problems arises when we migrate the framework to a remote store based backup:

  1. Slow backups: To add support for remote stores, the commit phase of the Runloop must be changed to support both remote store commits as well. To counteract the larger amount of data needed to be written to the remote store, we enable the commit phase to occur concurrently to processing.
  2. Backwards compatibility: the new commit scheme will have the ability to commit to both changelog and remote store for the same commit. 
  3. Transactional State Checkpointing: The framework does not support transactional state checkpointing for remote stores as the current format is only intended for changelog checkpoints.

Proposed Changes

Checkpoint Format changes

With the upcoming changes to the commit lifecycle to support storage systems other than changelog, we must introduce a new checkpointing format to reflect the flexibility of the stored checkpoint data. We must:

  1. Distinguish the input checkpoint from the state checkpoints
  2. State checkpoint must support both Kafka changelog state checkpoints and Ambry state checkpoints
  3. Feature must be backwards compatible with previous checkpoints, and must be able to rollback

Instead of writing the new checkpoint message format to a new checkpoint topic (v2), we are writing to the same topic under a different namespace of the checkpoint topic (v1) is also possible for the migration. The checkpoint manager will selectively pull the checkpoints from the checkpoint topic which will match the key of the written checkpoint. The advantage of this approach is that the migration will no longer require creating a new Kafka topic. However, since we are planning to enable this for all users, the versions that are currently being used, could not have forward compatibility available for being able to read the new checkpoint format from the old checkpoint stream.

State Backup and Commit Interface Changes

In general, in order to accommodate for async commits, we will be splitting  the commit phase into 2 sections in order to separate the async and sync operations:

  1. Sync Snapshot: Section that is exclusive with processing but occurs quickly. Responsible for PREPARING FOR UPLOAD current local state (Example: taking the delta-snapshot of the local store and store the offsets)
  2. Async Upload: Section that occurs concurrently with processing which performs the WRITING to remote state (Example: upload of the delta-snapshot to remote DB and write to the checkpoint topic)

Furthermore to address exclusive, non-overlapping commits, as well as the possible case where state upload takes too long, the new “max.commit.delay” configuration will be introduced with a reasonable default to determine the behavior of system when the “Upload” section exceeds “task.commit.ms” during a commit, ie a second commit is triggered before all 2 sections of the last commit completed:

  1. If Upload section exceeds and total time spent in the Upload section <= max.commit.delay, then we skip the current commit and allow the ongoing commit in the Upload section to finish
  2. If Upload section exceeds and total time spent in the Upload section > max.commit.delay, then we block the new commit  as well as any processing until the previous commit in the Upload section is finished

Finally to achieve the same transactional guarantees as the current transactional state checkpoint model, we must save the remote backup store “offsets” to the local disk and the checkpoint stream during the task commit as well, so that the restore process may map a specific on disk version to a remote version of the state store. The “offsets” in the remote store must be a unique identifier to the version of the remote store, in the case of a blob store, this will be a Blob id which is obtained from the store after a successful upload.

Public Interfaces


Checkpoint.java
public interface Checkpoint {
  /**
   * Gets the version number of the Checkpoint
   * @return Short indicating the version number
   */
  short getVersion();

  /**
   * Gets a unmodifiable view of the last processed offsets for {@link SystemStreamPartition}s.
   * The returned value differs based on the Checkpoint version:
   * <ol>
   *    <li>For {@link CheckpointV1}, returns the input {@link SystemStreamPartition} offsets, as well
   *      as the latest KafkaStateChangelogOffset for any store changelog {@link SystemStreamPartition} </li>
   *    <li>For {@link CheckpointV2} returns the input offsets only.</li>
   * </ol>
   *
   * @return a unmodifiable view of last processed offsets for {@link SystemStreamPartition}s.
   */
  Map<SystemStreamPartition, String> getOffsets();
}
StateBackendFactory.java
/**
 * Factory to build the Samza {@link TaskBackupManager}, {@link TaskRestoreManager} and {@link StateBackendAdmin}
 * for a particular state storage backend, which are used to durably backup the Samza task state.
 */
public interface StateBackendFactory {
  TaskBackupManager getBackupManager(JobContext jobContext,
      ContainerModel containerModel,
      TaskModel taskModel,
      ExecutorService backupExecutor,
      MetricsRegistry taskInstanceMetricsRegistry,
      Config config,
      Clock clock,
      File loggedStoreBaseDir,
      File nonLoggedStoreBaseDir);

  TaskRestoreManager getRestoreManager(JobContext jobContext,
      ContainerContext containerContext,
      TaskModel taskModel,
      ExecutorService restoreExecutor,
      MetricsRegistry metricsRegistry,
      Config config,
      Clock clock,
      File loggedStoreBaseDir,
      File nonLoggedStoreBaseDir,
      KafkaChangelogRestoreParams kafkaChangelogRestoreParams);

  StateBackendAdmin getAdmin(JobModel jobModel, Config config);
}
TaskBackupManager.java
/**
 * <p>
 * TaskBackupManager is the interface that must be implemented for any remote system that Samza persists its state to
 * during the task commit operation.
 * {@link #snapshot(CheckpointId)} will be evoked synchronous to task processing and get a snapshot of the stores
 * state to be persisted for the commit. {@link #upload(CheckpointId, Map)} will then use the snapshotted state
 * to persist to the underlying backup system and will be asynchronous to task processing.
 * </p>
 * The interface will be evoked in the following way:
 * <ul>
 *   <li>Snapshot will be called before Upload.</li>
 *   <li>persistToFilesystem will be called after Upload is completed</li>
 *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
 * </ul>
 */
public interface TaskBackupManager {

  /**
   * Initializes the TaskBackupManager instance.
   *
   * @param checkpoint last recorded checkpoint from the CheckpointManager or null if no last checkpoint was found
   */
  void init(@Nullable Checkpoint checkpoint);

  /**
   *  Snapshot is used to capture the current state of the stores in order to persist it to the backup manager in the
   *  {@link #upload(CheckpointId, Map)} (CheckpointId, Map)} phase. Performs the commit operation that is
   *  synchronous to processing. Returns the per store name state checkpoint markers to be used in upload.
   *
   * @param checkpointId {@link CheckpointId} of the current commit
   * @return a map of store name to state checkpoint markers for stores managed by this state backend
   */
  Map<String, String> snapshot(CheckpointId checkpointId);

  /**
   * Upload is used to persist the state provided by the {@link #snapshot(CheckpointId)} to the
   * underlying backup system. Commit operation that is asynchronous to message processing and returns a
   * {@link CompletableFuture} containing the successfully uploaded state checkpoint markers .
   *
   * @param checkpointId {@link CheckpointId} of the current commit
   * @param stateCheckpointMarkers the map of storename to state checkpoint markers returned by
   *                               {@link #snapshot(CheckpointId)}
   * @return a {@link CompletableFuture} containing a map of store name to state checkpoint markers
   *         after the upload is complete
   */
  CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId, Map<String, String> stateCheckpointMarkers);

  /**
   * Cleanup any local or remote state for checkpoint information that is older than the provided checkpointId
   * This operation is required to be idempotent.
   *
   * @param checkpointId the {@link CheckpointId} of the last successfully committed checkpoint
   * @param stateCheckpointMarkers a map of store name to state checkpoint markers returned by
   *                               {@link #upload(CheckpointId, Map)} (CheckpointId, Map)} upload}
   */
  CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, String> stateCheckpointMarkers);

  /**
   * Shutdown hook the backup manager to cleanup any allocated resources
   */
  void close();

}
TaskRestoreManager.java
/**
 * The helper interface restores task state.
 */
public interface TaskRestoreManager {

  /**
   * Initialize state resources such as store directories.
   */
  void init(Checkpoint checkpoint);

  /**
   * Restore state from checkpoints, state snapshots and changelogs.
   * Currently, store restoration happens on a separate thread pool within {@code ContainerStorageManager}. In case of
   * interrupt/shutdown signals from {@code SamzaContainer}, {@code ContainerStorageManager} may interrupt the restore
   * thread.
   *
   * Note: Typically, interrupt signals don't bubble up as {@link InterruptedException} unless the restore thread is
   * waiting on IO/network. In case of busy looping, implementors are expected to check the interrupt status of the
   * thread periodically and shutdown gracefully before throwing {@link InterruptedException} upstream.
   * {@code SamzaContainer} will not wait for clean up and the interrupt signal is the best effort by the container
   * to notify that its shutting down.
   */
  void restore() throws InterruptedException;

  /**
   * Closes all initiated resources include storage engines
   */
  void close();
}
TaskStorageAdmin.java
/**
 * Admin responsible for loading any resources related to state backend
 */
public interface StateBackendAdmin {
  /**
   * Create all the resources required per job per store state backend
   */
  void createResources();

  /**
   * Validate all resources required per job per state for state backend
   */
  void validateResources();
}


Implementation and Test Plan

List of items to validate:

  1. Serde of checkpoints v2: successfully reads and writes v2 checkpoints
  2. Backwards compatibility for checkpoints v2: can read and deserialize v1 checkpoints when “readV2Checkpoints” are disabled
  3. Forwards compatibility for checkpoints v2: successfully skips new read checkpoints v2 with different keys on previous versions samza
  4. Dual checkpoints write: v2 writes both the v1 checkpoint message and v2 checkpoint message must be present in to checkpoint topic
  5. Test switching readCheckpointV2 enabled config: could switch between v2 and v1 checkpoint read on the latest version via config in new version

Compatibility, Deprecation, and Migration Plan

Checkpoint V2 Migration Plan

The rollout occur in the following steps:

  1. Create new checkpoint topic with a new version number (v2), using the new schema (Enabled via config)
  2. Write checkpoints to both v1 and v2 checkpoints, but read only from the v1 checkpoint
  3. Enable job to restore from the new checkpoint topic (via config)
  4. Ensure that the job is stable with the new checkpoint scheme on new topic
  5. Stop write to old checkpoint topic and delete old checkpoint topic (Cannot roll back at this point

For step 1), all the new checkpoint topics (v2) will be using the samza-internal1 cluster rather than queuing.

For step 5), this may be running for up to a quarter before the checkpoint could be deleted to ensure stability, since step 4 cannot be rolled back.

Rejected Alternatives

  • No labels