You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: Under Discussion

Discussion thread: here 

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, in Kafka Streams, if a suppression buffer has low traffic for the number of records received (i.e. the buffer could not advance the stream time which would cause records to get stuck in the buffers). Consequently, records would remain in these buffers unless further records were received and the stream time be advanced. Therefore, we need a way to flush out these records such the user can access them despite the low traffic constraint. This is also a future step towards tracking stream time independently on a per key basis rather than per partition.

This situation is in fact worse for certain cases where sensors would wait for 24 hours before dumping data into a stream. So realistically, we could end up in a situation where suppress processors does not receive records for long periods of time (on the scale of hours) and that means that stream time cannot be advanced, thus no records are ever emitted during this time of inactivity. For most users, they wish to avoid this situation, so we should work to resolve this by flushing out records every so often to prevent them from getting "stuck" in the buffer.


Proposed Changes

All things considered, we should automate the flushing of records that have remained in the buffers for too long. For many Kafka users, they prefer not to have to manually call a method which would trigger the such an operation. In light of such an opinion, the best approach would be to add a config suppressed.record.retention.ms  which would be defined by the user to indicate how long they wish a record to remain in the suppression buffer.

However, most if not all processors in Kafka as of the moment does not have a Time instance as a field in its implementations (or an internal clock).  Thus, they cannot track time (not stream time but actual computer clock time). There is a way around this though. Periodically, what we could let the StreamThread  do is insert a record (similar to a tombstone, kind of like a null record) which would be given to the processor to process. We don't have to check if a processor is explicitly a suppress processor (which means checking if types match). Instead, what we could do is add a method to InternalProcessorContext called insertNullRecord  which by default does nothing (hence it is empty).

Specifically, for suppress processors, we could override this method and call the actual process()  method with a null key and value (with time.milliseconds()  being the stream time). When the suppress processor receives this record, it will retain a timestamp field which stores when the last time a null record was inserted. In this manner, we are able to track roughly how much time has passed within a suppress processor without having to insert an actual Time instance as field.

In this manner, we could then expire records which have exceeded suppressed.record.retention.ms  and evict the records as we see fit. 

Rejected Alternatives

There has been some thoughts of adding a Time  instance to suppress processors, but that is against convention since processors should not keep track of time. Instead, that should be left to stream threads and other processes.

  • No labels