...
Table of Contents |
---|
Current state: Under Discussion Accepted [VOTE]: 167 Add interface for the state store restoration process
Discussion thread:here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
...
Code Block |
---|
public interface StateRestoreListener { void onRestoreStart(TopicPartition topicPartition, StateStore storeName, long startingOffset, long endOffset); void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored); void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored); } |
...
The changes also include adding a setter
method on the KafkaStreams
object:, named setGlobalStateRestoreListener
to reinforce the fact the listener is for the entire application
Code Block | ||
---|---|---|
| ||
public void setStateRestoreListenersetGlobalStateRestoreListener(final StateRestoreListener stateRestoreListener) |
...
For single action state restoration, there is AbstractNotifyingRestoreCallback
Code Block |
---|
public abstract class AbstractNotifyingRestoreCallback implements StateRestoreCallback, StateRestoreListener { @Override public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) { } @Override public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) { } @Override public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) { } } |
For the corresponding bulk action state restoration, we have AbstractBatchingRestoreCallback
Code Block |
---|
public abstract class AbstractBatchingRestoreCallback implements BatchingStateRestoreCallback, StateRestoreListener { @Override public void restore(byte[] key, byte[] value) { throw new UnsupportedOperationException("Single restore not supported"); } @Override public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) { } @Override public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) { } @Override public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) { } } |
...
The first use case is user updates of the restore progress - In this case users of a Kafka Streams program application want to get notification receive updates of the progress, reporting status back to restoration progress and publish those updates to a UI for example. The StateRestoreListener
set via the KafkaStreams.setStateRestoreListener
method will be a global listener. This means for reporting purposes there will only be one setGlobalStateRestoreListener
method functions as a single, global listener reporting on the restoration status for all state stores in an application. Additionally, the StateRestoreListener
also reports on the bootstrapping progress of any GlobalKTables
defined in the application.
The second use case is internal state store management, closing and re-opening a RocksDB instance for bulk loading with different configuration settings for example. In this case implementors of a custom store want notification of restoration start, progress and ending for state manage purposes. In this case, the StateRestoreListener
implementation is used used internally by the given state store. In this use case, users can specify a StateStoreListener
per store, but the intent here is not for reporting but for internal state management.
To use the listener functionality users will implement the StateRestoreListener
interface in addition to the StateRestoreCallback
or BatchingStateRestoreCallback
interfaces when constructing their callbacks. Providing the callback is still done via the ProcessorContext.register
method.
...