Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently punctuate is triggered by the advance of the task's timestamp, which is the minimum of the timestamps of all input partitions. By default this means the event-time from the messages but a custom TimestampExtractor can be provided to use system-time instead of event-time. However, in that case the triggering of punctuate is still driven by the arrival of messages to all partitions and not by the advance of the system-time itself. The effect is that if any one of the input partitions has messages arriving irregularly, punctuate will be also be called at irregular intervals and in the extreme case not called at all if any one of the input partitions doesn't receive any messages.

Public Interfaces

org.apache.kafka.streams.processor.Processor
org.apache.kafka.streams.processor.ProcessorContext
org.apache.kafka.streams.kstream.Transformer

 See below

Terminology

Term
Description
Stream partition time

The value returned by the TimestampExtractor implementation in use or -1 if there haven't been any messages received for that partition.

This can be the record timestamp, wall-clock time or any other notion of time defined by the user. However, as per the API doc, the extracted timestamp MUST represent the milliseconds since midnight, January 1, 1970 UTC. Please note that currently the TimestampExtractor is global to the KafkaStreams instance but after KIP-123 the extractor will be per source allowing multiple different extractors within a topology.

Stream timeDefined as the smallest among all its input stream partition timestamps (-1 if any of the partition hasn't received messages)
Punctuate timeReference time used to trigger the Punctuate calls, currently the stream time.
Punctuate's timestamp argumentCurrently the stream time when this method is being called
Punctuate's output record timeRecord timestamp for records returned by Transformer.punctuate or generated from punctuate via ProcessorContext.forward. Currently the stream time.

...

Code Block
titleProcessorContext
@Deprecated
void schedule(long interval); //current, stream-time semantics

voidCancellable schedule(long interval, PunctuationType type, Punctuator callback); //new
// We could allow this to be called once for each value of PunctuationType to mix approaches.

...

Code Block
titlePunctuationType
enum PunctuationType {
  STREAM_TIME,
  SYSTEM_TIME,
}

And Cancellable is 

Code Block
titleCancellable
interface Cancellable {
  void cancel();
}

Cancellable return type is provided to cater for more complicated use cases as such described in the Punctuate Use Cases sub page. For those cases requiring stream-time based punctuation with a system-time upper bound (aka "hybrid" punctuation semantics) the following pattern can be used:

Code Block
ProcessorContext context;
long streamTimeInterval = ...;
long systemTimeUpperBound = ...; //e.g. systemTimeUpperBound = streamTimeInterval + some tolerance 
Cancellable streamTimeSchedule;
Cancellable systemTimeSchedule;
 
public void init(ProcessorContext context){
	this.context = context;
	streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval,   this::streamTimePunctuate);
	systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, systemTimeUpperBound, this::systemTimePunctuate);	
}
 
public void streamTimePunctuate(long streamTime){
	periodicBusiness(streamTime);

	systemTimeSchedule.cancel();
	systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, systemTimeUpperBound, this::systemTimePunctuate);
}
 
public void systemTimePunctuate(long systemTime){
	periodicBusiness(context.timestamp());

	streamTimeSchedule.cancel();
	streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval, this::streamTimePunctuate);
}
 
public void periodicBusiness(long streamTime){ 
	// guard against streamTime == -1, easy enough.
	// if you need system time instead, just use System.currentTimeMillis()
 
	// do something businessy here
}

Other alternatives under discussion:

(A) Change the semantics of punctuate() to be purely "system time driven", instead of "part time driven, and part data-driven". That is, the punctuate function triggering will no longer be dependent whether there are new data arriving, and hence not on the timestamps of the arriving data either. Instead it will be triggered only by system wall-clock time.

...

(E) Finally, the hybrid approach (this is convenient for the use cases in Punctuate Use Cases but difficult to reason about):

Code Block
titleProcessorContext
/**
* Schedule punctuate at stream-time intervals with a system-time upper bound. 
* For pure system-time based punctuation streamTimeInterval can be set to -1 == infinite
* and systemTimeUpperBound to the desired interval
*/
schedule(Punctuator callback, long streamTimeInterval, long systemTimeUpperBound); 

/**
* Schedule punctuate at stream-time intervals without a system-time upper bound,
* i.e. pure stream-time based punctuation
*/
schedule(Punctuator callback, long streamTimeInterval);

...

  • It's been argued this type of hybrid punctuation is more difficult to reason about than separate stream-time and system-time punctuations and the approach need further thought
  • Some problems with this algorithm have been identified for edge case scenarios (see discussion thread)
  • The various trade-offs of this approach may better be left to the the users as per the mantra "make simple thing easy and complex things possible"

The current proposal opens the door to adding more PunctuationTypes in the future and so after further discussion and in a separate KIP, other approaches such as the hybrid one can be added later on.

Hybrid However, hybrid semantics can be implemented on top of the 2 PunctuationType callbacks, however, this would require that the schedules are cancellable (or instead that a call to schedule() overwrites the previous PuncuationSchedule of the given type, instead of just adding another PuncuationSchedule as it does today) as follows:

Code Block
ProcessorContext context;
long streamTimeInterval = ...;
long systemTimeUpperBound = ...; //e.g. systemTimeUpperBound = streamTimeInterval + some tolerance 
Cancellable streamTimeSchedule;
Cancellable systemTimeSchedule;
 
public void init(ProcessorContext context){
	this.context = context;
	streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval,   this::streamTimePunctuate);
	systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, systemTimeUpperBound, this::systemTimePunctuate);	
}
 
public void streamTimePunctuate(long streamTime){
	periodicBusiness(streamTime);

	systemTimeSchedule.cancel();
	systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, systemTimeUpperBound, this::systemTimePunctuate);
}
 
public void systemTimePunctuate(long systemTime){
	periodicBusiness(context.timestamp());

	streamTimeSchedule.cancel();
	streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval, this::streamTimePunctuate);
}
 
public void periodicBusiness(long streamTime){ 
	// guard against streamTime == -1, easy enough.
	// if you need system time instead, just use System.currentTimeMillis()
 
	// do something businessy here
}

Where Cancellable is either an interface containing just a single void cancel() method or also boolean isCancelled() like hereas show in the Proposed Changes section. This gives users more flexibility in addressing the various trade-offs inherent in this design as is most appropriate to their use case.

Compatibility, Deprecation, and Migration Plan

...