Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  "Under Discussion"Accepted

Discussion threadhere

JIRAhereKAFKA-5233

Released: Targeting 0.11 1.0.0 if Release Schedule cutoffs met

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

org.apache.kafka.streams.kstream.Transformer
org.apache.kafka.streams.kstream.ValueTransformer

 

Terminology

Term
Description
Stream partition time

The value returned by the TimestampExtractor implementation in use or -1 if there haven't been any messages received for that partition.

This can be the record timestamp, wall-clock time or any other notion of time defined by the user. However, as per the API doc, the extracted timestamp MUST represent the milliseconds since midnight, January 1, 1970 UTC. Please note that currently the TimestampExtractor is global to the KafkaStreams instance but after KIP-123 the extractor will be per source allowing multiple different extractors within a topology.

Stream timeDefined as the smallest among all its input stream partition timestamps (-1 if any of the partition hasn't received messages)
Punctuate timeReference time used to trigger the Punctuate calls, currently the stream time.
Punctuate's timestamp argumentCurrently the stream time when this method is being called
Punctuate's output record timeRecord timestamp for records returned by Transformer.punctuate or generated from punctuate via ProcessorContext.forward. Currently the stream time.

...

The proposal is to deprecate the current punctuate() method on Processor, Transformer and ValueTransformer interfaces:

title
Code Block
Processor<K,V>
@Deprecated
void punctuate(long timestamp); // current

...

Code Block
titlePunctuationType
enum PunctuationType {
  STREAM_TIME,
  SYSTEMWALL_CLOCK_TIME,
}

And Cancellable is 

...

Code Block
ProcessorContext context;
long streamTimeInterval = ...;
long systemTimeUpperBound = ...; //e.g. systemTimeUpperBound = streamTimeInterval + some tolerance 
Cancellable streamTimeSchedule;
Cancellable systemTimeSchedule;
 
public void init(ProcessorContext context){
	this.context = context;
	streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval,   this::streamTimePunctuate);
	systemTimeSchedule = context.schedule(PunctuationType.SYSTEMWALL_CLOCK_TIME, systemTimeUpperBound, this::systemTimePunctuate);	
}
 
public void streamTimePunctuate(long streamTime){
	periodicBusiness(streamTime);

	systemTimeSchedule.cancel();
	systemTimeSchedule = context.schedule(PunctuationType.SYSTEMWALL_CLOCK_TIME, systemTimeUpperBound, this::systemTimePunctuate);
}
 
public void systemTimePunctuate(long systemTime){
	periodicBusiness(context.timestamp());

	streamTimeSchedule.cancel();
	streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval, this::streamTimePunctuate);
}
 
public void periodicBusiness(long streamTime){ 
	// guard against streamTime == -1, easy enough.
	// if you need system time instead, just use System.currentTimeMillis()
 
	// do something businessy here
}

...

  • Processor.punctuate(long timestamp),
  • Transformer.punctuate(long timestamp),
  • ValueTransformer.punctuate(long timestamp),
  • ProcessorContext.schedule(long interval);

...