Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEmacs
titleCheckpoint.java
linenumberstrue
collapsetrue
public interface Checkpoint {
  /**
   * Gets the version number of the Checkpoint
   * @return Short indicating the version number
   */
  short getVersion();

  /**
   * Gets a unmodifiable view of the last processed offsets for {@link SystemStreamPartition}s.
   * The returned value differs based on the Checkpoint version:
   * <ol>
   *    <li>For {@link CheckpointV1}, returns the input {@link SystemStreamPartition} offsets, as well
   *      as the latest KafkaStateChangelogOffset for any store changelog {@link SystemStreamPartition} </li>
   *    <li>For {@link CheckpointV2} returns the input offsets only.</li>
   * </ol>
   *
   * @return a unmodifiable view of last processed offsets for {@link SystemStreamPartition}s.
   */
  Map<SystemStreamPartition, String> getOffsets();
}

...

Code Block
languagejava
themeEmacs
titleStateBackendFactory.java
linenumberstrue
collapsetrue
/**
 * Factory to build the Samza {@link TaskBackupManager}, {@link TaskRestoreManager} and {@link StateBackendAdmin}
 * for a particular state storage backend, which are used to durably backup the Samza task state.
 */
public interface StateBackendFactory {
  TaskBackupManager getBackupManager(JobContext jobContext,
      ContainerModel containerModel,
      TaskModel taskModel,
      ExecutorService backupExecutor,
      MetricsRegistry taskInstanceMetricsRegistry,
      Config config,
      Clock clock,
      File loggedStoreBaseDir,
      File nonLoggedStoreBaseDir);

  TaskRestoreManager getRestoreManager(JobContext jobContext,
      ContainerContext containerContext,
      TaskModel taskModel,
      ExecutorService restoreExecutor,
      MetricsRegistry metricsRegistry,
      Config config,
      Clock clock,
      File loggedStoreBaseDir,
      File nonLoggedStoreBaseDir,
      KafkaChangelogRestoreParams kafkaChangelogRestoreParams);

  StateBackendAdmin getAdmin(JobModel jobModel, Config config);
}

...

Code Block
languagejava
themeEmacs
titleTaskBackupManager.java
linenumberstrue
collapsetrue
/**
 * <p>
 * TaskBackupManager is the interface that must be implemented for any remote system that Samza persists its state to
 * during the task commit operation.
 * {@link #snapshot(CheckpointId)} will be evoked synchronous to task processing and get a snapshot of the stores
 * state to be persisted for the commit. {@link #upload(CheckpointId, Map)} will then use the snapshotted state
 * to persist to the underlying backup system and will be asynchronous to task processing.
 * </p>
 * The interface will be evoked in the following way:
 * <ul>
 *   <li>Snapshot will be called before Upload.</li>
 *   <li>persistToFilesystem will be called after Upload is completed</li>
 *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
 * </ul>
 */
public interface TaskBackupManager {

  /**
   * Initializes the TaskBackupManager instance.
   *
   * @param checkpoint last recorded checkpoint from the CheckpointManager or null if no last checkpoint was found
   */
  void init(@Nullable Checkpoint checkpoint);

  /**
   *  Snapshot is used to capture the current state of the stores in order to persist it to the backup manager in the
   *  {@link #upload(CheckpointId, Map)} (CheckpointId, Map)} phase. Performs the commit operation that is
   *  synchronous to processing. Returns the per store name state checkpoint markers to be used in upload.
   *
   * @param checkpointId {@link CheckpointId} of the current commit
   * @return a map of store name to state checkpoint markers for stores managed by this state backend
   */
  Map<String, String> snapshot(CheckpointId checkpointId);

  /**
   * Upload is used to persist the state provided by the {@link #snapshot(CheckpointId)} to the
   * underlying backup system. Commit operation that is asynchronous to message processing and returns a
   * {@link CompletableFuture} containing the successfully uploaded state checkpoint markers .
   *
   * @param checkpointId {@link CheckpointId} of the current commit
   * @param stateCheckpointMarkers the map of storename to state checkpoint markers returned by
   *                               {@link #snapshot(CheckpointId)}
   * @return a {@link CompletableFuture} containing a map of store name to state checkpoint markers
   *         after the upload is complete
   */
  CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId, Map<String, String> stateCheckpointMarkers);

  /**
   * Cleanup any local or remote state for checkpoint information that is older than the provided checkpointId
   * This operation is required to be idempotent.
   *
   * @param checkpointId the {@link CheckpointId} of the last successfully committed checkpoint
   * @param stateCheckpointMarkers a map of store name to state checkpoint markers returned by
   *                               {@link #upload(CheckpointId, Map)} (CheckpointId, Map)} upload}
   */
  CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, String> stateCheckpointMarkers);

  /**
   * Shutdown hook the backup manager to cleanup any allocated resources
   */
  void close();

}

...

Code Block
languagejava
themeEmacs
titleTaskRestoreManager.java
linenumberstrue
collapsetrue
/**
 * The helper interface restores task state.
 */
public interface TaskRestoreManager {

  /**
   * Initialize state resources such as store directories.
   */
  void init(Checkpoint checkpoint);

  /**
   * Restore state from checkpoints, state snapshots and changelogs.
   * Currently, store restoration happens on a separate thread pool within {@code ContainerStorageManager}. In case of
   * interrupt/shutdown signals from {@code SamzaContainer}, {@code ContainerStorageManager} may interrupt the restore
   * thread.
   *
   * Note: Typically, interrupt signals don't bubble up as {@link InterruptedException} unless the restore thread is
   * waiting on IO/network. In case of busy looping, implementors are expected to check the interrupt status of the
   * thread periodically and shutdown gracefully before throwing {@link InterruptedException} upstream.
   * {@code SamzaContainer} will not wait for clean up and the interrupt signal is the best effort by the container
   * to notify that its shutting down.
   */
  void restore() throws InterruptedException;

  /**
   * Closes all initiated resources include storage engines
   */
  void close();
}

...

Code Block
languagejava
themeEmacs
titleTaskStorageAdmin.java
linenumberstrue
collapsetrue
/**
 * Admin responsible for loading any resources related to state backend
 */
public interface StateBackendAdmin {
  /**
   * Create all the resources required per job per store state backend
   */
  void createResources();

  /**
   * Validate all resources required per job per state for state backend
   */
  void validateResources();
}

...