...
This proposal aims to make this recovery more efficient for certain persistent stores that can be optimized for bulk writes by adding a new interface with "restoreAll" functionality.
The Additionally this proposal will also add an additional interface an interface used as an event listener to do the following:
- Notification when the restoration (bulk or not) process begins and endsstarts.
- Intermediate notification as batches are restored with number of records and last offset restored.
- Notification when the restoration (again bulk or not) process ends.
The proposed listener interface will work for existing and custom state storesbe available for two use cases:
- External or user notification of state restoration progress for monitoring purposes when the application is fully online. This will require adding a setter method on the
KafkaStreams
instance described in the next section. - Internal or per state store notification so the state store can perform any required resource management at the beginning or end of the restoration. Closing and re-opening a RocksDB database to use bulk loading configurations is one intended result of providing this listener.
We'll outline these use cases in more detail below.
NOTE: This work is building off ideas originally proposed in
Jira | ||||||
---|---|---|---|---|---|---|
|
...
- The
BatchingStateRestoreCallback interface
. - The
StateRestoreListener
interface.TheStateRestoreNotification
interface.
...
Code Block | ||
---|---|---|
| ||
public interface BatchingStateRestoreCallback extends StateRestoreCallback { void restoreAll(Collection<KeyValue<byte[], byte []>> records); } |
This interface will allow for state stores to implement a bulk loading approach during the restore phase. The StateRestoreCallback
interface is begin kept as is for backwards compatibility.
Code Block |
---|
public interface StateRestoreNotification { void restoreStart(); void restoreEnd(); } |
...
StateRestoreListener |
...
For example we may need to open a state store with settings optimized for bulk loading, then close and re-open the state store at the end of the restoration phase with settings for the normal put/get operations.
Code Block |
---|
public interface StateRestoreListener { void onRestoreStart(StringStateStore storeName, long startingOffset, long endOffset); void onBatchRestored(String storeName, long batchEndOffset, long numRestored); void onRestoreEnd(String storeName, long totalRestored); } |
The onBatchRestored
method is called after records retrieved from each poll()
call have been restored. This should is to give users a sense of progress being made in the restore process.
...
Code Block | ||
---|---|---|
| ||
public void setStateRestoreListener(final StateRestoreListener stateRestoreListener) |
StateRestoreListener Use Cases
The first use case is user updates of restore progress - In this case users of a Kafka Streams program want to get notification of the progress, reporting status back to UI for example. The StateRestoreListener
set via the KafkaStreams.setStateRestoreListener
method will be a global listener. This means for reporting purposes there will only be one listener reporting on the status for all stores.
The second use case is internal state store management, closing and re-opening a RocksDB instance for bulk loading with different configuration settings for example. In this case implementors of a custom store want notification of restoration start, progress and ending for state manage purposes. In this case the StateRestoreListener
implementation is used internally by the given state store. In this use case users can specify a StateStoreListener
per store, but the intent here is not for reporting but for internal state management.
To use the listener functionality users will implement the StateRestoreListener
interface in addition to the StateRestoreCallback
or BatchingStateRestoreCallback
interfaces when constructing their callbacks. Providing the callback is still done via the ProcessorContext.register
method. The idea behind the StateRestoreListener
being set on the KafkaStreams
object vs a configuration parameter is it allows for implementors to have access to enclosing state when creating the listener for reporting purposes. This method sets a "global" state restore listener and if users want to provide a custom StateRestoreListener
per state store, the BatchingStateRestoreCallback.getStateRestoreListener
method can be overridden as described above.
Compatibility, Deprecation, and Migration Plan
- Since the
BatchingStateResoreCallback
extends theStateRestoreCallback
there should be no impact to classes already implementing this interface. - The
StateRestoreContext
andStateRestoreNotification
interfaces are additions interface is an addition to the code base so no impact is expected. - The addition of a setter method on the
KafkaStreams
object adds no impact to existing code. - Abstract classes implementing the different callback approaches and the
StateRestoreNotification
interfaceStateRestoreListener
interface with no-op methods are provided.
- Since the
...