THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- Use
CachingKeyValueStore
to store keys and partition times. We perhaps want a cache backed by a store, in which case, this class is a perfect candidate for such an approach. - ProcessorStateManager would probably store these state stores. CachingKeyValueStore constructed using RocksDbKeyValueBytesStoreSupplier as underlying store for support.
- Indivative that ultimately some method might be added to ProcessorContext (whether it is internal or public facing API)
- For suppressors, we could maybe have the max of all stream times which renders the logic equivalent to what it was before or we will need to resolve the low traffic issue (depending on how seriously we rate it).
- More than likely, we will retrieve a
CachingKeyValueStore
from ProcessorContext (after it was initially registered as a store under stream task).- Add some configuration permitting the creation of such a store.
- Use this store as needed. Whether one store is enough or multiple is still up for debate.
- Could effectively be used for key tracking and ultimately fixing the problem with grace periods.
- The general approach is as follows:
- Check some newly added configuration if per key stream time tracking is permissible.
- We register a
CachingKeyValueStore
using ProcessorContext (the key is simply the record key, while the timestamp will be serialized into a byte[]). - For suppressors and Kafka Stream's processors relating to grace periods, we will retrieve the store-backed cache upon initialization (of course, using some specific name).
- Stream times will be recorded as needed.
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
...