Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current stateUnder Discussion

Discussion thread: here 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

This tradeoff is not strictly necessary, though, because each key represents a logically independent sequence of events. Tracking by partition is simply convenient, but typically not logically meaningful. That is, the partitions are just physically independent sequences of events, so it's convenient to track stream time at this granularity. It would be just as correct, and more useful for IOT-like use cases, to track time independently for each key.

Public Interfaces

Overall few points:

...

For per key stream time tracking to be scalable, a cache-backed store will need to be created. Consequently, this would require user approval before something like this occurs. In which case, a configuration will be added – allow.stream.time.key.tracking , a boolean config which allows the creation of stores.

Proposed Changes

Fundamentally speaking, there will be a few configurations added, but public API wise, nothing really needs to be modified. What we are aiming for is a behavioral change internally

Here are the few core steps in implementing this per key stream time tracking system. When considering the number of keys that we will have to track, it is not uncommon to have millions of unique keys that we might have to process. In effect, storing them all in memory is too costly. The best solution for this case would be a cache backed by a state store, in which case

...

  • Indivative that ultimately some method might be added to ProcessorContext (whether it is internal or public facing API)

...

, Kafka's current CachingKeyValueStore  fulfills that role perfectly. Given that the user allows us to create these stores, how we track the timestamps should be relatively simple:

  1. When a topology is initialized, we will register a (or multiple) CachingKeyValueStore(s) in StreamThread's ProcessorContext.
  2. After which, during processing, if it is necessary for finer granularity of stream time tracking, we will get a store from the ProcessorContext (which exists perhaps under the name "stream-time-table").
  3. We will use that store as a field which can be easily accessed for stream time updates and queries. 
    1. Suppressors and any other processors involving the calculation of grace periods would probably require a store should the user choose it. (whether each processor has an independent store or a shared one is still up for discussion.

The behavior the user should expect in this instance is that instead of records being evicted on a per partition basis, it will be per key. To illustrate, imagine we have the same records list as the example provided in the Motivation section. We could see that we do not have to wait for 24 hours to get the records, nor are keys dropped in partition B because they are not considered late (as stream time now is per key, and thus, the stream time upon which a record is determined if it is evicted is determined solely by records of the same key). 

The metrics that are reported for this design has yet to be decided upon.

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If a Kafka Streams instance is migrated/restarted, we should check for the whether or not a previous store was constructed under the same location. If that is the case, we should try to at the very least partially restore the contents of that table to retain accuracy. 

Rejected Alternatives

TBD after discussion. N/A for the momentIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.