Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

 

Status

Current state:  "Under Discussion"Accepted

Discussion threadhere

JIRAhereKAFKA-5233

Released: 1.0..0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

org.apache.kafka.streams.kstream.Transformer
org.apache.kafka.streams.kstream.ValueTransformer

 

Terminology

Term
Description
Stream partition time

The value returned by the TimestampExtractor implementation in use or -1 if there haven't been any messages received for that partition.

This can be the record timestamp, wall-clock time or any other notion of time defined by the user. However, as per the API doc, the extracted timestamp MUST represent the milliseconds since midnight, January 1, 1970 UTC. Please note that currently the TimestampExtractor is global to the KafkaStreams instance but after KIP-123 the extractor will be per source allowing multiple different extractors within a topology.

Stream timeDefined as the smallest among all its input stream partition timestamps (-1 if any of the partition hasn't received messages)
Punctuate timeReference time used to trigger the Punctuate calls, currently the stream time.
Punctuate's timestamp argumentCurrently the stream time when this method is being called
Punctuate's output record timeRecord timestamp for records returned by Transformer.punctuate or generated from punctuate via ProcessorContext.forward. Currently the stream time.

...

The proposal is to deprecate the current punctuate() method on Processor, Transformer and ValueTransformer interfaces:

title
Code Block
Processor<K,V>
@Deprecated
void punctuate(long timestamp); // current

...

Code Block
titlePunctuationType
enum PunctuationType {
  STREAM_TIME,
  SYSTEMWALL_CLOCK_TIME,
}

And Cancellable is 

...

Code Block
ProcessorContext context;
long streamTimeInterval = ...;
long systemTimeUpperBound = ...; //e.g. systemTimeUpperBound = streamTimeInterval + some tolerance 
Cancellable streamTimeSchedule;
Cancellable systemTimeSchedule;
 
public void init(ProcessorContext context){
	this.context = context;
	streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval,   this::streamTimePunctuate);
	systemTimeSchedule = context.schedule(PunctuationType.SYSTEMWALL_CLOCK_TIME, systemTimeUpperBound, this::systemTimePunctuate);	
}
 
public void streamTimePunctuate(long streamTime){
	periodicBusiness(streamTime);

	systemTimeSchedule.cancel();
	systemTimeSchedule = context.schedule(PunctuationType.SYSTEMWALL_CLOCK_TIME, systemTimeUpperBound, this::systemTimePunctuate);
}
 
public void systemTimePunctuate(long systemTime){
	periodicBusiness(context.timestamp());

	streamTimeSchedule.cancel();
	streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval, this::streamTimePunctuate);
}
 
public void periodicBusiness(long streamTime){ 
	// guard against streamTime == -1, easy enough.
	// if you need system time instead, just use System.currentTimeMillis()
 
	// do something businessy here
}

Other alternatives under discussion:

Compatibility, Deprecation, and Migration Plan

The following methods will be deprecated

  • Processor.punctuate(long timestamp),
  • Transformer.punctuate(long timestamp),
  • ValueTransformer.punctuate(long timestamp),
  • ProcessorContext.schedule(long interval);

The deprecated methods shall remain for some time along the newly added ones to allow for a smooth migration.

The semantics of the deprecated methods shall remain unchanged.

A call to the deprecated ProcessorContext.schedule(interval) from within a Processor will be equivalent to:

Code Block
context.schedule(interval, PunctuationType.STREAM_TIME, this::punctuate);

A call to the deprecated ProcessorContext.schedule(interval) from within a Transformer will be equivalent to:

Code Block
context.schedule(interval, PunctuationType.STREAM_TIME, timestamp -> {
	KeyValue<K,V> pair = punctuate(timestamp);
	if (record != null) {
		context.forward(pair.key, pair.value);
	}
});

 

Test Plan

Stream time, system time and a mix of both PunctuationTypes should be tested. Existing test for punctuation can be re-used to guide the test cases for stream time only. System time and mixed stream & system time tests will have to be developed.

Rejected Alternatives

(A) (A) Change the semantics of punctuate() to be purely "system time driven", instead of "part time driven, and part data-driven". That is, the punctuate function triggering will no longer be dependent whether there are new data arriving, and hence not on the timestamps of the arriving data either. Instead it will be triggered only by system wall-clock time.

...

  • The above approach changes the semantics of the punctuate method and therefore is not backward-compatible.
  • It is not clear if doing data-driven periodic operations from the process() method without the intricate calculations of minimum timestamp per input partition is sufficient to cater for all use cases that may be attainable using present day stream-time based punctuate 

 

(B) An alternative could be to leave current semantics as the defaults for the punctuate method but allow a configuration switch between event and system time.

...

(E) Finally, the hybrid approach (this is convenient for the use cases in in Punctuate Use Cases):

Code Block
titleProcessorContext
/**
* Schedule punctuate at stream-time intervals with a system-time upper bound. 
* For pure system-time based punctuation streamTimeInterval can be set to -1 == infinite
* and systemTimeUpperBound to the desired interval
*/
schedule(Punctuator callback, long streamTimeInterval, long systemTimeUpperBound); 

/**
* Schedule punctuate at stream-time intervals without a system-time upper bound,
* i.e. pure stream-time based punctuation
*/
schedule(Punctuator callback, long streamTimeInterval);

...

  • It's been argued this type of hybrid punctuation is more difficult to reason about than separate stream-time and system-time punctuations and the approach need further thought
  • Some problems with this algorithm have been identified for edge case scenarios (see see discussion thread)
  • The various trade-offs of this approach may better be left to the the users as per the mantra "make simple thing easy and complex things possible"

...

However, hybrid semantics can be implemented on top of the 2 PunctuationType callbacks, as show in the Proposed Changes section. This gives users more flexibility in addressing the various trade-offs inherent in this design as is most appropriate to their use case.

Compatibility, Deprecation, and Migration Plan

We shall deprecate the following methods

Processor.punctuate(long timestamp), Transformer.punctuate(long timestamp), ProcessorContext.schedule(long interval);

The deprecated methods shall remain for some time along the newly added ones to allow a smooth migration.

The semantics of the deprecated methods shall remain unchanged.

A call to the deprecated ProcessorContext.schedule(interval) from within a Processor will be equivalent to:

 

Code Block
context.schedule(PunctuationType.STREAM_TIME, this::punctuate);

A call to the deprecated ProcessorContext.schedule(interval) from within a Transformer will be equivalent to:

 

Code Block
context.schedule(PunctuationType.STREAM_TIME, timestamp -> {
	KeyValue<K,V> pair = punctuate(timestamp);
	if (record != null) {
		context.forward(pair.key, pair.value);
	}
});

 

Test Plan

Stream time, system time and a mix of both PunctuationTypes should be tested. Existing test for punctuation can be re-used to guide the test cases for stream time only. System time and mixed stream & system time tests will have to be developed.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.