Current state: Under Discussion
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, when restoring a state store in a Kafka Streams application, we put one key-value pair at a time into the store.
This proposal aims to make this recovery more efficient for certain persistent stores that can be optimized for bulk writes by adding a new interface with "restoreAll" functionality.
The proposal will also add an additional interface used as an event listener to do the following:
- Notification when the restoration (bulk or not) process begins and ends.
- Intermediate notification as batches are restored with number of records and last offset restored.
The proposed listener interface will work for existing and custom state stores.
NOTE: This work is building off ideas originally proposed in
and will be incorporated into this KIP.Public Interfaces
This KIP will introduce the following interfaces:
- The
BatchingStateRestoreCallback interface
. - The
StateRestoreListener
interface. - The
StateRestoreNotification
interface.
public interface BatchingStateRestoreCallback extends StateRestoreCallback { void restoreAll(Collection<KeyValue<byte[], byte []>> records); }
This interface will allow for state stores to implement a bulk loading approach during the restore phase. The StateRestoreCallback
interface is begin kept as is for backwards compatibility.
public interface StateRestoreNotification { void restoreStart(); void restoreEnd(); }
While the restoreStart
and restoreEnd
methods have similar names to methods found in the StateRestoreListener
interface, the purpose is solely for state store preparation and clean up during the restoration phase.
For example we may need to open a state store with settings optimized for bulk loading, then close and re-open the state store at the end of the restoration phase with settings for the normal put/get operations.
public interface StateRestoreListener { void onRestoreStart(String storeName, long startingOffset, long endOffset); void onBatchRestored(String storeName, long batchEndOffset, long numRestored); void onRestoreEnd(String storeName, long totalRestored); }
The onBatchRestored
method is called after records retrieved from each poll()
call have been restored. This should give users a sense of progress being made in the restore process.
The number of times onBatchRestored
is called is (Total records in change log / MAX_POLL_RECORDS)
.
The changes also include adding a setter
method on the KafkaStreams
object:
public void setStateRestoreListener(final StateRestoreListener stateRestoreListener)
The idea behind the StateRestoreListener
being set on the KafkaStreams
object vs a configuration parameter is it allows for implementors to have access to enclosing state when creating the listener for reporting purposes. This method sets a "global" state restore listener and if users want to provide a custom StateRestoreListener
per state store, the BatchingStateRestoreCallback.getStateRestoreListener
method can be overridden as described above.
Compatibility, Deprecation, and Migration Plan
- Since the
BatchingStateResoreCallback
extends theStateRestoreCallback
there should be no impact to classes already implementing this interface. - The
StateRestoreContext
andStateRestoreNotification
interfaces are additions to the code base so no impact is expected. - The addition of a setter method on the
KafkaStreams
object adds no impact to existing code.
- Since the
Rejected Alternatives
N/A