Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current stateUnder Discussion

Discussion thread: here 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Fundamentally speaking, there will be a few configurations added, but public API wise, nothing really needs to be modified. What we are aiming for is a behavioral change internally

Here are the few core steps in implementing this per key stream time tracking system. When considering the number of keys that we will have to track, it is not uncommon to have millions of unique keys that we might have to process. In effect, storing them all in memory is too costly. The best solution for this case would be a cache backed by a state store, in which case, Kafka's current CachingKeyValueStore  fulfills that role perfectly. Given that the user allows us to create these stores, how we track the timestamps should be relatively simple:

  1. When a topology is initialized, we will register a (or multiple) CachingKeyValueStore(s) in StreamThread's ProcessorContext.
  2. After which, during processing, if it is necessary for finer granularity of stream time tracking, we will get a store from the ProcessorContext (which exists perhaps under the name "stream-time-table").
  3. We will the get the cache-backed store to update and keep track of timeuse that store as a field which can be easily accessed for stream time updates and queries
    1. Suppressors and any other processors involving the calculation of grace periods would probably require a store should the user choose it. (whether each processor has an independent store or a shared one is still up for discussion.

The behavior the user should expect in this instance is that instead of records being evicted on a per partition basis, it will be per key. To illustrate, imagine we have the same records list as the example provided in the Motivation section. We could see that we do not have to wait for 24 hours to get the records, nor are keys dropped in partition B because they are not considered late (as stream time now is per key, and thus, the stream time upon which a record is determined if it is evicted is determined solely by records of the same key). 

The metrics that are reported for this design has yet to be decided upon.

Compatibility, Deprecation, and Migration Plan

...