Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current stateAccepted [VOTE]: 167 Add interface for the state store restoration process

 

Discussion thread:here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-5363

...

The changes also include adding a setter method on the KafkaStreams object:, named setGlobalStateRestoreListener to reinforce the fact the listener is for the entire application

Code Block
languagejava
public void setStateRestoreListenersetGlobalStateRestoreListener(final StateRestoreListener stateRestoreListener)

...

For single action state restoration, there is AbstractNotifyingRestoreCallback

...

For the corresponding bulk action state restoration, we have AbstractBatchingRestoreCallback

...

The first use case is user updates of the restore progress - In this case users of a Kafka Streams program application want to get notification receive updates of the progress, reporting status back to restoration progress and publish those updates to a UI for example.  The StateRestoreListener set via the KafkaStreams.setStateRestoreListener method will be a global listener.  This means for reporting purposes there will only be one setGlobalStateRestoreListener method functions as a single, global listener reporting on the restoration status for all state stores in an application.  Additionally, the StateRestoreListener also reports on the bootstrapping progress of any GlobalKTables defined in the application.   

 The second use case is internal state store management, closing and re-opening a RocksDB instance for bulk loading with different configuration settings for example.  In this case implementors of a custom store want notification of restoration start, progress and ending for state manage purposes.  In this case, the StateRestoreListener implementation is  used used internally by the given state store.  In this use case, users can specify a StateStoreListener per store, but the intent here is not for reporting but for internal state management.    

To use the listener functionality users will implement the StateRestoreListener interface in addition to the StateRestoreCallback or BatchingStateRestoreCallback interfaces when constructing their callbacks.  Providing the callback is still done via the ProcessorContext.register method.  

...