Current state: Under Discussion
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, when restoring a state store in a Kafka Streams application, we put one key-value at a time into the store. This proposal aims to make this recovery more efficient by creating a new interface with "restoreAll" functionality allowing for bulk writes by the underlying state store implementation. The proposal will also add "beginRestore" and "endRestore" callback methods potentially used for
- Tracking when the bulk restoration process begins and ends.
- Keeping track of the number of records and last offset restored.
NOTE: This work is building off ideas originally proposed in
and will be incorporated into this KIP.Public Interfaces
This KIP will introduce
- The
BatchingStateRestoreCallback interface
. - The
StateRestoreContext
interface.
The interfaces will look like:
public interface BatchingStateRestoreCallback extends StateRestoreCallback { void restoreAll(Collection<KeyValue<byte[], byte []>> records); void beginRestore(StateRestoreContext restoreContext); void endRestore(); } public interface StateRestoreContext { void setOffsetRestored(long offset); void setNumberRestored(int restoreCount); long offsetRestored(); int numberRestored(); }
The idea behind the StateRestoreContext
is applications can query at the end of the restoration process to get the last offset restored and the total count of records restored.
Compatibility, Deprecation, and Migration Plan
- Since the
BatchingStateResoreCallback
extends theStateRestoreCallback
there should be no impact to classes already implementing this interface. - The
StateRestoreContext
is an addition to the code base so no impact is expected either.
- Since the
Rejected Alternatives
N/A