Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

All things considered, we should automate the flushing of records that have remained in the buffers for too long. For many Kafka users, they prefer not to have to manually call a method which would trigger the such an operation. In light of such an opinion, the best approach would be to add a config suppressed.record.retention.ms  which would be defined by the user to indicate how long they wish a record to remain in the suppression buffer.

However, most if not all processors in Kafka as of the moment does not have a Time instance as a field in its implementations (or an internal clock).  Thus, they cannot track time (not stream time but actual computer clock timehow much time has passed IRL). There is a way around this though. Periodically, what we could let the StreamThread  do is insert a record (similar to a tombstone, kind of like a null record) which would be given to the processor to process. We don't have to check if a processor is explicitly a suppress processor (which means checking if types match). Instead, what we could do is add a method to InternalProcessorContext called insertNullRecord  which by default does nothing (hence it is empty).

Specifically, for suppress processors, we could override this method and have insertNullRecord  call the actual process()  method with a null key and value (with time.milliseconds()  being the stream time). When the suppress processor receives this record, it will retain a timestamp field which stores when the last time a null record was inserted. In this manner, we are able to track roughly how much time has passed within a suppress processor without having to insert an actual Time instance as field.

...