Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current stateUnder Discussion

Discussion thread: here 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

This tradeoff is not strictly necessary, though, because each key represents a logically independent sequence of events. Tracking by partition is simply convenient, but typically not logically meaningful. That is, the partitions are just physically independent sequences of events, so it's convenient to track stream time at this granularity. It would be just as correct, and more useful for IOT-like use cases, to track time independently for each key.

Public Interfaces

Overall few points:

  • Use CachingKeyValueStore  to store keys and partition times. We perhaps want a cache backed by a store, in which case, this class is a perfect candidate for such an approach.
  • For suppressors, we could maybe have the max of all stream times which renders the logic equivalent to what it was before or we will need to resolve the low traffic issue (depending on how seriously we rate it).
  • More than likely, we will retrieve a CachingKeyValueStore  from ProcessorContext (after it was initially registered as a store under stream task). 
    • Add some configuration permitting the creation of such a store.
    • Use this store as needed. Whether one store is enough or multiple is still up for debate.
    • Could effectively be used for key tracking and ultimately fixing the problem with grace periods.
  • The general approach is as follows:
    • Check some newly added configuration if per key stream time tracking is permissible. 
    • We register a CachingKeyValueStore using ProcessorContext (the key is simply the record key, while the timestamp will be serialized into a byte[]).
    • For suppressors and Kafka Stream's processors relating to grace periods, we will retrieve the store-backed cache upon initialization (of course, using some specific name).
    • Stream times will be recorded as needed.

Proposed Changes

For per key stream time tracking to be scalable, a cache-backed store will need to be created. Consequently, this would require user approval before something like this occurs. In which case, a configuration will be added – allow.stream.time.key.tracking , a boolean config which allows the creation of stores.

Proposed Changes

Fundamentally speaking, there will be a few configurations added, but public API wise, nothing really needs to be modified. What we are aiming for is a behavioral change internally

Here are the few core steps in implementing this per key stream time tracking system. When considering the number of keys that we will have to track, it is not uncommon to have millions of unique keys that we might have to process. In effect, storing them all in memory is too costly. The best solution for this case would be a cache backed by a state store, in which case, Kafka's current CachingKeyValueStore  fulfills that role perfectly. Given that the user allows us to create these stores, how we track the timestamps should be relatively simple:

  1. When a topology is initialized, we will register a (or multiple) CachingKeyValueStore(s) in StreamThread's ProcessorContext.
  2. After which, during processing, if it is necessary for finer granularity of stream time tracking, we will get a store from the ProcessorContext (which exists perhaps under the name "stream-time-table").
  3. We will use that store as a field which can be easily accessed for stream time updates and queries. 
    1. Suppressors and any other processors involving the calculation of grace periods would probably require a store should the user choose it. (whether each processor has an independent store or a shared one is still up for discussion.

The behavior the user should expect in this instance is that instead of records being evicted on a per partition basis, it will be per key. To illustrate, imagine we have the same records list as the example provided in the Motivation section. We could see that we do not have to wait for 24 hours to get the records, nor are keys dropped in partition B because they are not considered late (as stream time now is per key, and thus, the stream time upon which a record is determined if it is evicted is determined solely by records of the same key). 

The metrics that are reported for this design has yet to be decided uponDescribe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If a Kafka Streams instance is migrated/restarted, we should check for the whether or not a previous store was constructed under the same location. If that is the case, we should try to at the very least partially restore the contents of that table to retain accuracy. 

Rejected Alternatives

TBD after discussion. N/A for the momentIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.