Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The current proposal opens the door to adding more PunctuationTypes in the future and so after further discussion and in a separate KIP, other approaches such as the hybrid one can be added later on.

Hybrid semantics can be implemented on top of the 2 PunctuationType callbacks, however, this would require that the schedules are cancellable as follows:

Code Block
ProcessorContext context;
long streamTimeInterval = ...;
long systemTimeUpperBound = ...; //e.g. systemTimeUpperBound = streamTimeInterval + some tolerance 
Cancellable streamTimeSchedule;
Cancellable systemTimeSchedule;
long lastStreamTimePunctation = -1;
 
public void init(ProcessorContext context){
	this.context = context;
	streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval,   this::streamTimePunctuate);
	systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, systemTimeUpperBound, this::systemTimePunctuate);	
}
 
public void streamTimePunctuate(long streamTime){
	periodicBusiness(streamTime);

	systemTimeSchedule.cancel();
	systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, systemTimeUpperBound, this::systemTimePunctuate);
}
 
public void systemTimePunctuate(long systemTime){
	periodicBusiness(context.timestamp());

	streamTimeSchedule.cancel();
	streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval, this::streamTimePunctuate);
}
 
public void periodicBusiness(long streamTime){ 
	// guard against streamTime == -1, easy enough.
	// if you need system time instead, just use System.currentTimeMillis()
 
	// do something businessy here
}

Where Cancellable is either an interface containing just a single void cancel() method or also boolean isCancelled() like here.

Compatibility, Deprecation, and Migration Plan

...