You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »


Status

Current stateUnder Discussion

Discussion thread: here 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Streams uses a concept of "stream time", which is computed as the highest timestamp observed by stateful operators, per partition. This concept of time backs grace period, retention time, and suppression.

For use cases in which data is produced to topics in roughly chronological order (as in db change capture), this reckoning is fine.

Some use cases have a different pattern, though. For example, in IOT applications, it's common for sensors to save up quite a bit of data and then dump it all at once into the topic. See https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware for a concrete example of the use case.

There have been cases where each sensor dumps 24 hours' worth of data at a time into the topic. This results in a pattern in which, when reading a single partition, the operators observe a lot of consecutive records for one key that increase in timestamp for 24 hours, then a bunch of consecutive records for another key that are also increasing in timestamp over the same 24 hour period. With our current stream-time definition, this means that the partition's stream time increases while reading the first key's data, but then stays paused while reading the second key's data, since the second batch of records all have timestamps in the "past".

E.g:

A@t0 (stream time: 0)
A@t1 (stream time: 1)
A@t2 (stream time: 2)
A@t3 (stream time: 3)
B@t0 (stream time: 3)
B@t1 (stream time: 3)
B@t2 (stream time: 3)
B@t3 (stream time: 3)

This pattern results in an unfortunate compromise in which folks are required to set the grace period to the max expected time skew, for example 24 hours, or Streams will just drop the second key's data (since it is late). But, this means that if they want to use Suppression for "final results", they have to wait 24 hours for the result.

This tradeoff is not strictly necessary, though, because each key represents a logically independent sequence of events. Tracking by partition is simply convenient, but typically not logically meaningful. That is, the partitions are just physically independent sequences of events, so it's convenient to track stream time at this granularity. It would be just as correct, and more useful for IOT-like use cases, to track time independently for each key.

Public Interfaces

Overall few points:

  • Use CachingKeyValueStore  to store keys and partition times. We perhaps want a cache backed by a store, in which case, this class is a perfect candidate for such an approach.
  • For suppressors, we could maybe have the max of all stream times which renders the logic equivalent to what it was before or we will need to resolve the low traffic issue (depending on how seriously we rate it).
  • More than likely, we will retrieve a CachingKeyValueStore  from ProcessorContext (after it was initially registered as a store under stream task). 
    • Add some configuration permitting the creation of such a store.
    • Use this store as needed. Whether one store is enough or multiple is still up for debate.
    • Could effectively be used for key tracking and ultimately fixing the problem with grace periods.
  • The general approach is as follows:
    • Check some newly added configuration if per key stream time tracking is permissible. 
    • We register a CachingKeyValueStore using ProcessorContext (the key is simply the record key, while the timestamp will be serialized into a byte[]).
    • For suppressors and Kafka Stream's processors relating to grace periods, we will retrieve the store-backed cache upon initialization (of course, using some specific name).
    • Stream times will be recorded as needed.

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels