Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Current state: "Under Discussion"


Co-Authored by Eduwer Camacaro.


Currently, Kafka Streams allows you to provide a StateRestoreListener which executes callbacks upon the start and end of a state restoration, and also after each batch is restored in the restoration. This is useful for monitoring any Active Task that is undergoing restoration.

KIP-869 adds metrics which further improve visibility of Active Task Restorations. However, as of now it is difficult to get real-time updates (via callbacks) which tell you about the state of Standby Tasks on your Kafka Streams instance. This would be most useful from the operational perspective, for example when implementing a rolling restart or a smooth scale down. In these situations, I as an operator of Kafka Streams want to be able to know where each Active and Standby Task lives and also how "caught up" they are so that I know when it's safe to remove or bounce a certain Streams instance.

In additional to the operational use-case described above, knowing when a Standby Task is created and destroyed (either by promotion or migration) would help operators test various configurations of during rolling upgrade scenarios with regards to how noisy the shuffling of Standby Tasks is.

Public Interfaces

We will add the StandbyTaskUpdateListenerStandbyUpdateListener interface as follows:

Code Block
public interface StandbyTaskUpdateListenerStandbyUpdateListener {

    enum SuspendReason {
     * Method called upon the creation of the Standby Task when Standby Processing starts for a given State Store Partition.
     * @param topicPartition   the TopicPartition of the Standby Task.
     * @param storeName        the name of the store being watched by this Standby Task.
     * @param earliestOffset   the earliest offset available on the Changelog topic.
     * @param startingOffset   the last offset written to the state store as determined from which the Standby Task starts watching the checkpoint
     * 						   file, or -1 if there is no previous state.
     * @param currentEndOffset the current latest offset on the associated changelog partition.
    void onTaskCreatedonStandbyUpdateStart(final TopicPartition topicPartition,
                             final String storeName,
                             final long earliestOffset,
                             final long startingOffset,
                             final long currentEndOffset);

     * Method called after restoringloading a batch of records from the changelog into the Standby State Store.
	 * In this case the maximum size of the batch is whatever
determined by    * the value of the MAX_POLL_RECORDS is set to.
     * This method is called after restoringloading each batch and it is advised to keep processing to a minimum.
     * Any heavy processing will hold up recoveringloading the next batch, hencereducing the slowingthroughput downof the restore process as a
     * wholeState Updater
	 * Thread.
     * If you need to do any extended processing or connecting to an external service consider doing so asynchronously.
     * @param topicPartition the TopicPartition containing the values to restore
     * @param storeName the name of the store undergoing restoration
     * @param batchEndOffset the inclusive ending offset for the current restored batch for this TopicPartition
     * @param numRestored the total number of records restored in this batch for this TopicPartition
     * @param currentEndOffset the current end offset of the changelog topic partition.
    void onBatchRestoredonBatchLoaded(final TopicPartition topicPartition,
                         final String storeName,
                         final long batchEndOffset,
                         final long numRestored,
                         final long currentEndOffset);

     * Method called after updates to a Standby TaskState isStore closedcease, either because the Standby Task was promoted to
 an Active Task
  * an Active *Task or because the Standby Task was migrated to another instance (in which case the data will
 be cleaned up
  * be cleaned *up after
     * @param topicPartition the TopicPartition containing the values to restore
     * @param storeName the name of the store undergoing restoration
     * @param storeOffset is the offset of the last changelog record that was read and put into the store at the time
     * of suspension.
     * @param currentEndOffset the current end offset of the changelog topic partition.
     * @param reason is the reason why the standby task was suspended.
    void onTaskSuspendedonStandbyUpdateSuspended(final TopicPartition topicPartition,
                                  final String storeName,
                                  final long storeOffset,
                                  final long currentEndOffset,
                                  final SuspendReason reason);


We will add a method to the KafkaStreams object:

Code Block
public void setStandbyTaskUpdateListenersetStandbyUpdateListener(StandbyTaskUpdateListenerStandbyUpdateListener otterStandbyTaskUpdateListenerotterStandbyUpdateListener);

Proposed Changes

We propose to create a StandbyTaskUpdateListenerStandbyUpdateListener interface, and allow users to supply one to their Kafka Streams Topology via KafkaStreams#setStandbyTaskUpdateListenerKafkaStreams#setStandbyUpdateListener(...) in a manner similar to the StateRestoreListener.

Compatibility, Deprecation, and Migration Plan

We are adding a new method without changing any existing API’s. Existing users will not need to know about the StandbyTaskUpdateListener StandbyUpdateListener functionality, and existing code will be unaffected.

Test Plan

This is a small change that can be tested sufficiently via unit tests. We at LittleHorse have implemented a draft PR and will test internally as well.

Rejected Alternatives

Do Nothing

The first alternative is “Do Nothing.” In theory, we can use the KafkaStreams#metrics() method to get a handle on the Restore Consumer, and look at the lag metrics of that consumer to determine how “caught up” the Standby Task is. This has a few problems:

  1. It feels “clunky” to go through JMX metrics within an application to change things that we want to handle in code path. Additionally, the updates are not guaranteed to be as precise, and it is a polling mechanism rather than a push-based (callback) mechanism.

  2. We have no easy way of determining why the Standby Task was migrated away to a different instance (whether it was PROMOTED to an Active Task, or MIGRATED to run on another Streams Instance).

Use StateRestoreListener

The second alternative is to use the StateRestoreListener for Standby Tasks as well. However, that quickly falls apart upon further examination because the API of the StateRestoreListener is semantically different from the StandbyTaskUpdateListener StandbyUpdateListener. Most crucially, a State Restoration has a definitive “finish line”, which is the last offset of the Changelog TopicPartition at the time that the State Restoration begins. This is because that offset will not increase during restoration, since the Active Task is down (we know this because the Active Task itself is undergoing restoration!). In contrast, with Standby Tasks, the finish line is constantly advancing.