You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion thread: here 

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, in Kafka Streams, if a suppression buffer has low traffic for the number of records received (i.e. the buffer could not advance the stream time which would cause records to get stuck in the buffers). Consequently, records would remain in these buffers unless further records were received and the stream time be advanced. Therefore, we need a way to flush out these records such the user can access them despite the low traffic constraint. This is also a future step towards tracking stream time independently on a per key basis rather than per partition.

Proposed Changes

All things considered, we should automate the flushing of records that have remained in the buffers for too long. For many Kafka users, they prefer not to have to manually call a method which would trigger the such an operation. In light of such an opinion, the best approach would be to add a config suppressed.record.retention.ms  which would be defined by the user to indicate how long they wish a record to remain in the suppression buffer.

However, most if not all processors in Kafka as of the moment does not have a Time instance as a field in its implementations (or an internal clock).  Thus, they cannot track time (not stream time but actual computer clock time). This unfortunately means that either a StreamThread somehow triggers the flush (in which case, we will have to check if the processor is really of type KTableSuppressProcessor) or we consider adding a local Time  instance in KTableSuppressProcessor  such that when process()  is called, we would check if the records in the buffer have expired. 

If none of these approaches is ideal, then we might have to let the user trigger a manual flush. (In retrospect, if they want to use this suppression buffer flushing for testing, then this additional API is preferred).

Rejected Alternatives

TBD upon further discussion.

  • No labels