THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Hybrid semantics can be implemented on top of the 2 PunctuationType callbacks, however, this would require that the schedules are cancellable (or instead that a call to schedule() overwrites the previous PuncuationSchedule of the given type, instead of just adding another PuncuationSchedule as it does today) as follows:
Code Block |
---|
ProcessorContext context; long streamTimeInterval = ...; long systemTimeUpperBound = ...; //e.g. systemTimeUpperBound = streamTimeInterval + some tolerance Cancellable streamTimeSchedule; Cancellable systemTimeSchedule; long lastStreamTimePunctation = -1; public void init(ProcessorContext context){ this.context = context; streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval, this::streamTimePunctuate); systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, systemTimeUpperBound, this::systemTimePunctuate); } public void streamTimePunctuate(long streamTime){ periodicBusiness(streamTime); systemTimeSchedule.cancel(); systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, systemTimeUpperBound, this::systemTimePunctuate); } public void systemTimePunctuate(long systemTime){ periodicBusiness(context.timestamp()); streamTimeSchedule.cancel(); streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval, this::streamTimePunctuate); } public void periodicBusiness(long streamTime){ // guard against streamTime == -1, easy enough. // if you need system time instead, just use System.currentTimeMillis() // do something businessy here } |
...