THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
public interface Checkpoint { /** * Gets the version number of the Checkpoint * @return Short indicating the version number */ short getVersion(); /** * Gets a unmodifiable view of the last processed offsets for {@link SystemStreamPartition}s. * The returned value differs based on the Checkpoint version: * <ol> * <li>For {@link CheckpointV1}, returns the input {@link SystemStreamPartition} offsets, as well * as the latest KafkaStateChangelogOffset for any store changelog {@link SystemStreamPartition} </li> * <li>For {@link CheckpointV2} returns the input offsets only.</li> * </ol> * * @return a unmodifiable view of last processed offsets for {@link SystemStreamPartition}s. */ Map<SystemStreamPartition, String> getOffsets(); } |
...
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
/** * Factory to build the Samza {@link TaskBackupManager}, {@link TaskRestoreManager} and {@link StateBackendAdmin} * for a particular state storage backend, which are used to durably backup the Samza task state. */ public interface StateBackendFactory { TaskBackupManager getBackupManager(JobContext jobContext, ContainerModel containerModel, TaskModel taskModel, ExecutorService backupExecutor, MetricsRegistry taskInstanceMetricsRegistry, Config config, Clock clock, File loggedStoreBaseDir, File nonLoggedStoreBaseDir); TaskRestoreManager getRestoreManager(JobContext jobContext, ContainerContext containerContext, TaskModel taskModel, ExecutorService restoreExecutor, MetricsRegistry metricsRegistry, Config config, Clock clock, File loggedStoreBaseDir, File nonLoggedStoreBaseDir, KafkaChangelogRestoreParams kafkaChangelogRestoreParams); StateBackendAdmin getAdmin(JobModel jobModel, Config config); } |
...
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
/** * <p> * TaskBackupManager is the interface that must be implemented for any remote system that Samza persists its state to * during the task commit operation. * {@link #snapshot(CheckpointId)} will be evoked synchronous to task processing and get a snapshot of the stores * state to be persisted for the commit. {@link #upload(CheckpointId, Map)} will then use the snapshotted state * to persist to the underlying backup system and will be asynchronous to task processing. * </p> * The interface will be evoked in the following way: * <ul> * <li>Snapshot will be called before Upload.</li> * <li>persistToFilesystem will be called after Upload is completed</li> * <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li> * </ul> */ public interface TaskBackupManager { /** * Initializes the TaskBackupManager instance. * * @param checkpoint last recorded checkpoint from the CheckpointManager or null if no last checkpoint was found */ void init(@Nullable Checkpoint checkpoint); /** * Snapshot is used to capture the current state of the stores in order to persist it to the backup manager in the * {@link #upload(CheckpointId, Map)} (CheckpointId, Map)} phase. Performs the commit operation that is * synchronous to processing. Returns the per store name state checkpoint markers to be used in upload. * * @param checkpointId {@link CheckpointId} of the current commit * @return a map of store name to state checkpoint markers for stores managed by this state backend */ Map<String, String> snapshot(CheckpointId checkpointId); /** * Upload is used to persist the state provided by the {@link #snapshot(CheckpointId)} to the * underlying backup system. Commit operation that is asynchronous to message processing and returns a * {@link CompletableFuture} containing the successfully uploaded state checkpoint markers . * * @param checkpointId {@link CheckpointId} of the current commit * @param stateCheckpointMarkers the map of storename to state checkpoint markers returned by * {@link #snapshot(CheckpointId)} * @return a {@link CompletableFuture} containing a map of store name to state checkpoint markers * after the upload is complete */ CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId, Map<String, String> stateCheckpointMarkers); /** * Cleanup any local or remote state for checkpoint information that is older than the provided checkpointId * This operation is required to be idempotent. * * @param checkpointId the {@link CheckpointId} of the last successfully committed checkpoint * @param stateCheckpointMarkers a map of store name to state checkpoint markers returned by * {@link #upload(CheckpointId, Map)} (CheckpointId, Map)} upload} */ CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, String> stateCheckpointMarkers); /** * Shutdown hook the backup manager to cleanup any allocated resources */ void close(); } |
...
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
/** * The helper interface restores task state. */ public interface TaskRestoreManager { /** * Initialize state resources such as store directories. */ void init(Checkpoint checkpoint); /** * Restore state from checkpoints, state snapshots and changelogs. * Currently, store restoration happens on a separate thread pool within {@code ContainerStorageManager}. In case of * interrupt/shutdown signals from {@code SamzaContainer}, {@code ContainerStorageManager} may interrupt the restore * thread. * * Note: Typically, interrupt signals don't bubble up as {@link InterruptedException} unless the restore thread is * waiting on IO/network. In case of busy looping, implementors are expected to check the interrupt status of the * thread periodically and shutdown gracefully before throwing {@link InterruptedException} upstream. * {@code SamzaContainer} will not wait for clean up and the interrupt signal is the best effort by the container * to notify that its shutting down. */ void restore() throws InterruptedException; /** * Closes all initiated resources include storage engines */ void close(); } |
...
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
/** * Admin responsible for loading any resources related to state backend */ public interface StateBackendAdmin { /** * Create all the resources required per job per store state backend */ void createResources(); /** * Validate all resources required per job per state for state backend */ void validateResources(); } |
...