Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The BatchingStateRestoreCallback interface.  
  • The StateRestoreListener StateRestoreListener interface.
  • The StateRestoreNotification interface.

The interfaces will look like: 

Code Block
languagejava
public interface BatchingStateRestoreCallback extends StateRestoreCallback {

   void restoreAll(Collection<KeyValue<byte[], byte []>> records);
}

 This interface will allow for state stores to implement a bulk loading approach during the restore phase.   The StateRestoreCallback interface is begin kept as is for backwards compatibility.

 

Code Block
public interface StateRestoreNotification {

    void restoreStart();

    StateRestoreListenervoid getStateRestoreListenerrestoreEnd();
}

While the restoreStart and restoreEnd methods have similar names to methods found in the StateRestoreListener interface, the purpose is solely for state store preparation and clean up during the restoration phase.  

For example we may need to open a state store with settings optimized for bulk loading, then close  and re-open the state store at the end of the restoration phase with settings for the normal put/get operations.

 

Code Block

 
 
public interface StateRestoreListener {

    void onRestoreStart(String storeName, long startingOffset, long endOffset);

    void onBatchRestored(String storeName, long batchEndOffset, long numRestored);

    void onRestoreEnd(String storeName, long endOffset, long totalRestored);

}

The onBatchRestored method is called after records retrieved from each poll() call have been restored.  This should give users a sense of progress being made in the restore process.

...

The idea behind the StateRestoreListener  being set on the KafkaStreams object vs a configuration parameter is it allows for implementors to have access to enclosing state when creating the listener for reporting purposes.

 

 This method sets a "global" state restore listener and if users want to provide a custom StateRestoreListener per state store, the BatchingStateRestoreCallback.getStateRestoreListener method can be overridden as described above.

 Additionally, users can specify a different StateRestoreListener implementation per restore callback by implementing the (or overriding from the provided abstract class)  getStateRestoreListener() method.  It will be assumed a return value other than null indicates the user wants to use a custom StateRestoreListener vs. the "global" on provided via the KafkaStreams.setStateRestoreListener method.

Compatibility, Deprecation, and Migration Plan

    • Since the BatchingStateResoreCallback extends the StateRestoreCallback there should be no impact to classes already implementing this interface.
    • The StateRestoreContext is an addition   and StateRestoreNotification interfaces are additions to the code base so no impact is expected either.
    • The addition of a setter method on the KafkaStreams object adds no impact to existing code.

...