Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This situation is in fact worse for certain cases where sensors would wait for 24 hours before dumping data into a stream. So realistically, we could end up in a situation where suppress processors does not receive records for long periods of time (on the scale of hours) and that means that stream time cannot be advanced, thus no records are ever emitted during this time of inactivity. For most users, they wish to avoid this situation, so we should work to resolve this by flushing out records every so often to prevent them from getting "stuck" in the buffer.

By extension, we could also implement a feature that has been requested in the past: allow suppressed operators to suppress records based on wall-clock time. 

Public Interfaces

This KIP would add the following method to the Suppressed  interface.

/**
* Configure the suppression to wait {@code timeToWaitForMoreEvents} amount of wall clock time after receiving a record
* before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces
* the first record in the buffer but does <em>not</em> re-start the timer.
*
* @param timeToWaitForMoreEvents The amount of wall clock time to wait, per record, for new events.
* @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
* @param <K> The key type for the KTable to apply this suppression to.
* @return a suppression configuration
*/
static <K> Suppressed<K> untilWallClockTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig);


Proposed Changes

All things considered, we should automate the flushing of records that have remained in the buffers for too long. For many Kafka users, they prefer not to have to manually call a method which would trigger such an operation. In light of such an opinion, the best approach would be to add a config suppressed.record.retention.ms  which would be defined by the user to indicate how long they wish a record to remain in the suppression buffer.

...

In this manner, we could then expire records which have exceeded suppressed.record.retention.ms  and evict the records as we see fit. 

Please note that using this approach, the amount of time that has passed in a suppress processor is approximated (reasonably well). If we really need to, we might need to add another config to determine how often we will send these tombstone records (please note that they are never exposed to the user, as it is completely hidden within one processor).  

Rejected Alternatives

There has been some thoughts of adding a Time  instance to suppress processors, but that is against convention since processors should not keep track of time (since it is also bloody expensive). Instead, that should be left to stream threads and other processes.