Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This tradeoff is not strictly necessary, though, because each key represents a logically independent sequence of events. Tracking by partition is simply convenient, but typically not logically meaningful. That is, the partitions are just physically independent sequences of events, so it's convenient to track stream time at this granularity. It would be just as correct, and more useful for IOT-like use cases, to track time independently for each key.

Public Interfaces

Overall few points:

  • Use CachingKeyValueStore  to store keys and partition times. We perhaps want a cache backed by a store, in which case, this class is a perfect candidate for such an approach.
  • For suppressors, we could maybe have the max of all stream times which renders the logic equivalent to what it was before or we will need to resolve the low traffic issue (depending on how seriously we rate it).
  • More than likely, we will retrieve a CachingKeyValueStore  from ProcessorContext (after it was initially registered as a store under stream task). 
    • Add some configuration permitting the creation of such a store.
    • Use this store as needed. Whether one store is enough or multiple is still up for debate.
    • Could effectively be used for key tracking and ultimately fixing the problem with grace periods.
  • The general approach is as follows:
    • Check some newly added configuration if per key stream time tracking is permissible. 
    • We register a CachingKeyValueStore using ProcessorContext (the key is simply the record key, while the timestamp will be serialized into a byte[]).
    • For suppressors and Kafka Stream's processors relating to grace periods, we will retrieve the store-backed cache upon initialization (of course, using some specific name).
    • Stream times will be recorded as needed.

Proposed Changes

...

For per key stream time tracking to be scalable, a cache-backed store will need to be created. Consequently, this would require user approval before something like this occurs. In which case, a configuration will be added – allow.stream.time.key.tracking , a boolean config which allows the creation of stores.

Proposed Changes

Fundamentally speaking, there will be a few configurations added, but public API wise, nothing really needs to be modified. What we are aiming for is a behavioral change. 

Here are the few core steps in implementing this per key stream time tracking system. When considering the number of keys that we will have to track, it is not uncommon to have millions of unique keys that we might have to process. In effect, storing them all in memory is too costly. The best solution for this case would be a cache backed by a state store, in which case, Kafka's current CachingKeyValueStore  fulfills that role perfectly. Given that the user allows us to create these stores, how we track the timestamps should be relatively simple:

  1. When a topology is initialized, we will register a (or multiple) CachingKeyValueStore(s) in StreamThread's ProcessorContext.
  2. After which, during processing, if it is necessary for finer granularity of stream time tracking, we will get a store from the ProcessorContext (which exists perhaps under the name "stream-time-table").
  3. We will the get the cache-backed store to update and keep track of time. 
    1. Suppressors and any other processors involving the calculation of grace periods would probably require a store should the user choose it. (whether each processor has an independent store or a shared one is still up for discussion.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If a Kafka Streams instance is migrated/restarted, we should check for the whether or not a previous store was constructed under the same location. If that is the case, we should try to at the very least partially restore the contents of that table to retain accuracy. 

Rejected Alternatives

TBD after discussion. N/A for the momentIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.