Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, when restoring a state store in a Kafka Streams application, we put one key-value pair at a time into the store.  This  

This proposal aims to make this recovery more efficient by creating for certain persistent stores that can be optimized for bulk writes by adding a new interface with "restoreAll" functionality allowing for bulk writes by the underlying state store implementation.  The .

 

The proposal will also add "beginRestore" and "endRestore" callback methods potentially used foran additional interface used as an event listener to do the following:

  1. Notification Tracking when the bulk restoration process begins and ends.
  2. Keeping track of the Intermediate notification as batches are restored with number of records and last offset restored.

The proposed listener interface will work for existing and custom state stores.

 

NOTE: This work is building off ideas originally proposed in 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4322
 and will be incorporated into this KIP.

Public Interfaces

This KIP will introduce the following interfaces:

  • The BatchingStateRestoreCallback interface.  
  • The StateRestoreContext The StateRestoreListenerinterface.

The interfaces will look like:

Code Block
languagejava
public interface BatchingStateRestoreCallback extends StateRestoreCallback {

   void restoreAll(Collection<KeyValue<byte[], byte []>> records);

   void beginRestore(StateRestoreContext restoreContext);

   void endRestore();

}
 
 
public interface StateRestoreContextStateRestoreListener {

    void updateLastOffsetRestoredonRestoreStart(TopicPartition topicPartition, long offsetstartingOffset);
 
    void updateNumberRestored(int restoreCountonBatchRestored(TopicPartition topicPartition, long batchEndOffset, long numRestored);

    longvoid lastOffsetRestoredonRestoreEnd();
TopicPartition topicPartition, 
long endOffset, intlong numberRestored(totalRestored);

}

The changes also include adding a setter method on the KafkaStreams object:

Code Block
languagejava
public void setStateRestoreListener(final StateRestoreListener stateRestoreListener)

The idea behind the StateRestoreContext is applications can query at the end of the restoration process to get the last offset restored and the total count of records restoredStateRestoreListener  being set on the KafkaStreams object vs a configuration parameter is it allows for implementors to have access to enclosing state when creating the listener for reporting purposes.

Compatibility, Deprecation, and Migration Plan

    • Since the BatchingStateResoreCallback extends the StateRestoreCallback there should be no impact to classes already implementing this interface.
    • The StateRestoreContext is an addition to the code base so no impact is expected either.
    • The addition of a setter method on the KafkaStreams object adds no impact to existing code.


Rejected Alternatives

N/A

...