Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Use CachingKeyValueStore  to store keys and partition times. We perhaps want a cache backed by a store, in which case, this class is a perfect candidate for such an approach.
  • ProcessorStateManager would probably store these state stores. CachingKeyValueStore constructed using RocksDbKeyValueBytesStoreSupplier as underlying store for support.
    • Indivative that ultimately some method might be added to ProcessorContext (whether it is internal or public facing API)
  • For suppressors, we could maybe have the max of all stream times which renders the logic equivalent to what it was before or we will need to resolve the low traffic issue (depending on how seriously we rate it).
  • More than likely, we will retrieve a CachingKeyValueStore  from ProcessorContext (after it was initially registered as a store under stream task)
    • Add some configuration permitting the creation of such a store.
    • Use this store as needed. Whether one store is enough or multiple is still up for debate.
    • Could effectively be used for key tracking and ultimately fixing the problem with grace periods.
  • The general approach is as follows:
    • Check some newly added configuration if per key stream time tracking is permissible. 
    • We register a CachingKeyValueStore using ProcessorContext (the key is simply the record key, while the timestamp will be serialized into a byte[]).
    • For suppressors and Kafka Stream's processors relating to grace periods, we will retrieve the store-backed cache upon initialization (of course, using some specific name).
    • Stream times will be recorded as needed.

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

...