Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, when GlobalStreamThread dies from a fatal error, an InvalidOffsetException will be forwarded to the user which is used to indicate that something wrong had occurred. We have improved upon the durability of GlobalStreamThread previously by wiping all data that had already been processed and rebuilding the store entirely (which was done in KAFKA-6121). However, we should also give the user the option of choosing their reset policy which up to this time, they had no chance of selecting.

There is already a config available "global.consumer.auto.offset.reset" which can be leveraged to reset offsets for global topics. So, no new config needs to be added. But currently, there is no support to override per-topic reset policy for global topics which would require a KIP to change the public interfaces mentioned below.

Public Interfaces

A new reset policy(s) will be added which will allow the user some flexibility in determining the resilience of GlobalStreamThread.  It should also be noted that Topology.addGlobalStore does not allow the user to decide which policy to use. This method will potentially have to be extended. 

Code Block
languagejava
titleToplogy.java
public class Topology { 

	// newly added
    public synchronized <K, V> Topology addGlobalStore(final AutoOffsetReset offsetReset,
													   final StoreBuilder<?> storeBuilder,
                                                       final String sourceName,
                                                       final TimestampExtractor timestampExtractor,
                                                       final Deserializer<K> keyDeserializer,
                                                       final Deserializer<V> valueDeserializer,
                                                       final String topic,
                                                       final String processorName,
                                                       final ProcessorSupplier<K, V> stateUpdateSupplier) {
    }
}


Proposed Changes

  • Use a new Using config- global.consumer.auto.offset.reset to  to set reset policy for global stream thread or use the existing one-  GlobalStreamThread and take actions based on this policy(cleaning up the stores and seekToBeginning/seekToEnd) based on this config as we do based on auto.offset.reset in StreamThread.
  • Add the interface mentioned above to reset in both StreamThread and GlobalStreamThreadallow users to reset per-topic overrides for GlobalTopics.

Compatibility, Deprecation, and Migration Plan

...