Current state: Under Discussion
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, when restoring a state store in a Kafka Streams application, we put one key-value pair at a time into the store.
This proposal aims to make this recovery more efficient for certain persistent stores that can be optimized for bulk writes by adding a new interface with "restoreAll" functionality.
The proposal will also add an additional interface used as an event listener to do the following:
- Notification when the bulk restoration process begins and ends.
- Intermediate notification as batches are restored with number of records and last offset restored.
The proposed listener interface will work for existing and custom state stores.
NOTE: This work is building off ideas originally proposed in
and will be incorporated into this KIP.Public Interfaces
This KIP will introduce the following interfaces:
- The
BatchingStateRestoreCallback interface
. - The
StateRestoreListener
interface.
The interfaces will look like:
public interface BatchingStateRestoreCallback extends StateRestoreCallback { void restoreAll(Collection<KeyValue<byte[], byte []>> records); } public interface StateRestoreListener { void onRestoreStart(TopicPartition topicPartition, long startingOffset); void onBatchRestored(TopicPartition topicPartition, long batchEndOffset, long numRestored); void onRestoreEnd(TopicPartition topicPartition, long endOffset, long totalRestored); }
The changes also include adding a setter
method on the KafkaStreams
object:
public void setStateRestoreListener(final StateRestoreListener stateRestoreListener)
The idea behind the StateRestoreListener
being set on the KafkaStreams
object vs a configuration parameter is it allows for implementors to have access to enclosing state when creating the listener for reporting purposes.
Compatibility, Deprecation, and Migration Plan
- Since the
BatchingStateResoreCallback
extends theStateRestoreCallback
there should be no impact to classes already implementing this interface. - The
StateRestoreContext
is an addition to the code base so no impact is expected either. - The addition of a setter method on the
KafkaStreams
object adds no impact to existing code.
- Since the
Rejected Alternatives
N/A