Current state: Under Discussion
Discussion thread: KIP-424: Allow suppression of intermediate events based on wall clock time
JIRA: KAFKA-7748
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This proposal would add a new method, untilWallClockTimeLimit
, to the Suppressed
interface:
/**
* Configure the suppression to wait {@code timeToWaitForMoreEvents} amount of wall clock time after receiving a record
* before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces
* the first record in the buffer but does <em>not</em> re-start the timer.
*
* @param timeToWaitForMoreEvents The amount of wall clock time to wait, per record, for new events.
* @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
* @param <K> The key type for the KTable to apply this suppression to.
* @return a suppression configuration
*/
static <K> Suppressed<K> untilWallClockTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig);
The intention is to configure an upper bound for how frequently streams will send updates for a particular windowed key downstream. It will function as a rate limiter for cases where the cache fills up, or there is no cache, to ensure streams will not flood downstream. It does require data to flow through at at least the rate specified in order to maintain that flow. For example, configuring a timeToWaitForMoreEvents
for a duration of 5 seconds would mean that you never receive more than one update downstream for a key within any 5 second period. However, checks against the wall clock are event driven: if no events are received in over 5 seconds, no events will be sent downstream.
The main code change here is in KTableSuppressProcessor.enforceConstraints()
, we will need a new predicate to determine when to emit. The InMemoryTimeOrderedKeyValueBuffer
may need to keep track of when we last evicted.
The commit.interval.ms
configuration variable provides a helpful upper bound in how often updates are emitted downstream. However, this requires that cache.max.bytes.buffering
is never full, which is difficult to manage for certain loads and defeats the purpose of having RocksDB as a state store.