...
Currently punctuate is triggered by the advance of the task's timestamp, which is the minimum of the timestamps of all input partitions. By default this means the event-time from the messages but a custom TimestampExtractor can be provided to use system-time instead of event-time. However, in that case the triggering of puncuate punctuate is still driven by the arrival of messages to all partitions and not by the advance of the system-time itself. The effect is that if any one of the input partitions has messages arriving irregularly, punctuate will be also be called at irregular intervals and in the extreme case not called at all if any one of the input partitions doesn't receive any messages.
Public Interfaces
See below
Terminology
Term | Description |
---|---|
Input Stream Partition Time | The value returned by the TimestampExtractor implementation in use. This can be the record timestamp, wall-clock time or any other notion of time defined by the user. However, as per the API doc, the extracted timestamp MUST represent the milliseconds since midnight, January 1, 1970 UTC. Please note that currently the TimestampExtractor is global to the KafkaStreams instance but after KIP-123 the extractor will be per source allowing multiple different extractors within a topology. |
Stream Time | Defined as the smallest among all its input stream partition timestamps. |
Punctuate Time | This is the reference time used to trigger the Punctuate calls, currently the stream time. |
Punctuate's timestamp argument | Currently the stream time when this method is being called |
Output Record Time | This is the record timestamp for records returned by Transformer.punctuate or generated from punctuate via ProcessorContext.forward. Currently the stream time. |
Proposed Changes
The proposal is to leave the current punctuate() method with the semantics as-is for backward compatibility and add a new overloaded variant that takes an enum value as parameter:
...