Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Status

Current state: "Under DiscussionAdopted"

Discussion thread: here

JIRA: here

...

Code Block
public interface StandbyUpdateListener {

    enum SuspendReason {
        MIGRATED,
        PROMOTED;
    }
    
    /**
     * Method called when Standby Processing starts for a given State Store Partition. A callback that will be invoked after registering the changelogs for each state store in a standby
     *
 task. It is guaranteed to *always be @paraminvoked topicPartitionbefore any records theare TopicPartitionloaded ofinto the Standbystandby Taskstore.
     *
 @param storeName   * @param topicPartition   the namechangelog ofTopicPartition the store being watched by for this Standbystandby Task.task
     * @param startingOffset  storeName the lastname offsetof written to the state store as determined from the checkpointbeing loaded
     * 						   file, or -1 if there is no previous state.
     * @param currentEndOffset the current latest offset on the associated changelog partition.@param startingOffset the offset from which the standby task begins consuming from the changelog
     */
    void onUpdateStart(final TopicPartition topicPartition,
                             final String storeName,
                             final long startingOffset,
                             final long currentEndOffset);

    /**
     * Method called after loading a batch of records from the changelog into the Standby State Store.
	 * In this case the maximum size of the batch is determined bywhatever
     * the value of the MAX_POLL_RECORDS is set to.
     * <n>
     * This method is called after loading each batch and it is advised to keep processing to a minimum.
     * Any heavy processing will holdblock the upstate loadingupdater thethread nextand batch,slow reducingdown the throughputrate of thestandby Statetask Updater
	 * Thread.
     *
 loading. Therefore, if  * If you need to do any extended processing or connectingconnect to an external service,
     * consider doing so asynchronously.
     *
     * @param topicPartition the TopicPartition containing the values to restore
     * @param storeName the name of the store undergoing restoration
     * @param batchEndOffset batchEndOffset the inclusivechangelog endingend offset (inclusive) forof the currentbatch restoredthat batchwas forjust this TopicPartitionloaded
     * @param numRestoredbatchSize the total number of records restored in thisthe batch that forwas thisjust TopicPartitionloaded
     * @param currentEndOffset the current end offset of the changelog topic partition.
     */
    void onBatchLoaded(final TopicPartition topicPartition,
                       final String storeName,
                       final TaskId taskId,
                       final long batchEndOffset,
                       final long numRestoredbatchSize,
                       final long currentEndOffset);

    /**
     * This method Methodis called afterwhen updatesthe tocorresponding astandby Standbytask Statestops Store ceaseupdating, eitherfor because the Standbyprovided Taskreason.
 was promoted to
  * <p>
  * an Active Task* or becauseIf the Standbytask Taskwas was{@code migratedMIGRATED} to another instance, this callback will be invoked after this
     * state store (and the task itself) are closed (in which case the data will
 be cleaned up after 
 * be cleaned up after* state.cleanup.delay.ms).
     * If the task was {@code PROMOTED} to an active task, the state store will not be closed, and the 
     * callback will be invoked after unregistering it as a standby task but before re-registering it as an active task 
     * and beginning restoration. In other words, this will always called before the corresponding 
     * {@link StateRestoreListener#onRestoreStart} call is made.
     *
     * @param topicPartition the TopicPartition containing the values to restore
     * @param storeName the name of the store undergoing restoration
     * @param storeOffset is the offset of the last changelog record that was read and put into the store at the time
     * of suspension.
     * @param currentEndOffset the current end offset of the changelog topic partition.
     * @param reason is the reason why the standby task was suspended.
     */
    void onUpdateSuspended(final TopicPartition topicPartition,
                                  final String storeName,
                                  final long storeOffset,
                                  final long currentEndOffset,
                                  final SuspendReason reason);
}

...