You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

 

 



Current state: Under Discussion

 

Discussion thread: here

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when restoring a state store in a Kafka Streams application, we put one key-value pair at a time into the store.  

This proposal aims to make this recovery more efficient for certain persistent stores that can be optimized for bulk writes by adding a new interface with "restoreAll" functionality.

 

The proposal will also add an additional interface used as an event listener to do the following:

  1. Notification when the bulk restoration process begins and ends.
  2. Intermediate notification as batches are restored with number of records and last offset restored.

The proposed listener interface will work for existing and custom state stores.

 

NOTE: This work is building off ideas originally proposed in  Unable to render Jira issues macro, execution error.  and will be incorporated into this KIP.

Public Interfaces

This KIP will introduce the following interfaces:

  • The BatchingStateRestoreCallback interface.  
  • The StateRestoreListenerinterface.

The interfaces will look like:

public interface BatchingStateRestoreCallback extends StateRestoreCallback {

   void restoreAll(Collection<KeyValue<byte[], byte []>> records);
}
 
 
public interface StateRestoreListener {

    void onRestoreStart(TopicPartition topicPartition, long startingOffset);

    void onBatchRestored(TopicPartition topicPartition, long batchEndOffset, long numRestored);

    void onRestoreEnd(TopicPartition topicPartition, long endOffset, long totalRestored);

}

The onBatchRestored method is called after records retrieved from each poll() call have been restored.  This should give users a sense of progress being made in the restore process.

The number of times onBatchRestored is called is (Total records in change log / MAX_POLL_RECORDS).

 

The changes also include adding a setter method on the KafkaStreams object:

public void setStateRestoreListener(final StateRestoreListener stateRestoreListener)

The idea behind the StateRestoreListener  being set on the KafkaStreams object vs a configuration parameter is it allows for implementors to have access to enclosing state when creating the listener for reporting purposes.

Compatibility, Deprecation, and Migration Plan

    • Since the BatchingStateResoreCallback extends the StateRestoreCallback there should be no impact to classes already implementing this interface.
    • The StateRestoreContext is an addition to the code base so no impact is expected either.
    • The addition of a setter method on the KafkaStreams object adds no impact to existing code.


Rejected Alternatives

N/A

 

 

 

  • No labels