Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, in Kafka Streams, if a suppression buffer has low traffic for the number of records received (i.e. the buffer could not advance the stream time which would cause records to get stuck in the buffers). Consequently, records would remain in these buffers unless further records were received and the stream time be advanced. Therefore, we need a way to flush out these records such the user can access them despite the low traffic constraint. This is also a future step towards tracking stream time independently on a per key basis rather than per partition.

This situation is in fact worse for certain cases where sensors would wait for 24 hours before dumping data into a stream. So realistically, we could end up in a situation where suppress processors does not receive records for long periods of time (on the scale of hours) and that means that stream time cannot be advanced, thus no records are ever emitted during this time of inactivity. For most users, they wish to avoid this situation, so we should work to resolve this by flushing out records every so often to prevent them from getting "stuck" in the buffer.


Proposed Changes

All things considered, we should automate the flushing of records that have remained in the buffers for too long. For many Kafka users, they prefer not to have to manually call a method which would trigger the such an operation. In light of such an opinion, the best approach would be to add a config suppressed.record.retention.ms  which would be defined by the user to indicate how long they wish a record to remain in the suppression buffer.

However, most if not all processors in Kafka as of the moment does not have a Time instance as a field in its implementations (or an internal clock).  Thus, they cannot track time (not stream time but actual computer clock time). This unfortunately means that either a StreamThread somehow triggers the flush (in which case, we will There is a way around this though. Periodically, what we could let the StreamThread  do is insert a record (similar to a tombstone, kind of like a null record) which would be given to the processor to process. We don't have to check if the processor is really of type KTableSuppressProcessor, and then things get messy) or we consider adding a local Time  instance in KTableSuppressProcessor and have an additional process check for the record expiration.

If none of these approaches is ideal, then we might have to let the user trigger a manual flush. (In retrospect, if they want to use this suppression buffer flushing for testing, then this additional API is preferred).

Rejected Alternatives

a processor is explicitly a suppress processor (which means checking if types match). Instead, what we could do is add a method to InternalProcessorContext called insertNullRecord  which by default does nothing (hence it is empty).

Specifically, for suppress processors, we could override this method and call the actual process()  method with a null key and value (with time.milliseconds()  being the stream time). When the suppress processor receives this record, it will retain a timestamp field which stores when the last time a null record was inserted. In this manner, we are able to track roughly how much time has passed within a suppress processor without having to insert an actual Time instance as field.

In this manner, we could then expire records which have exceeded suppressed.record.retention.ms  and evict the records as we see fit. 

Rejected Alternatives

There has been some thoughts of adding a Time  instance to suppress processors, but that is against convention since processors should not keep track of time. Instead, that should be left to stream threads and other processesTBD upon further discussion.