...
The current proposal opens the door to adding more PunctuationTypes in the future and so after further discussion and in a separate KIP, other approaches such as the hybrid one can be added later on.
Hybrid semantics can be implemented on top of the 2 PunctuationType callbacks, however, this would require that the schedules are cancellable as follows:
Code Block |
---|
ProcessorContext context; long streamTimeInterval = ...; long systemTimeUpperBound = ...; //e.g. systemTimeUpperBound = streamTimeInterval + some tolerance Cancellable streamTimeSchedule; Cancellable systemTimeSchedule; long lastStreamTimePunctation = -1; public void init(ProcessorContext context){ this.context = context; streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval, this::streamTimePunctuate); systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, systemTimeUpperBound, this::systemTimePunctuate); } public void streamTimePunctuate(long streamTime){ periodicBusiness(streamTime); systemTimeSchedule.cancel(); systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, systemTimeUpperBound, this::systemTimePunctuate); } public void systemTimePunctuate(long systemTime){ periodicBusiness(context.timestamp()); streamTimeSchedule.cancel(); streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval, this::streamTimePunctuate); } public void periodicBusiness(long streamTime){ // guard against streamTime == -1, easy enough. // if you need system time instead, just use System.currentTimeMillis() // do something businessy here } |
Where Cancellable is either an interface containing just a single void cancel() method or also boolean isCancelled() like here.
Compatibility, Deprecation, and Migration Plan
...