THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
enum PunctuationType { STREAM_TIME, SYSTEMWALL_CLOCK_TIME, } |
And Cancellable is
...
Code Block |
---|
ProcessorContext context; long streamTimeInterval = ...; long systemTimeUpperBound = ...; //e.g. systemTimeUpperBound = streamTimeInterval + some tolerance Cancellable streamTimeSchedule; Cancellable systemTimeSchedule; public void init(ProcessorContext context){ this.context = context; streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval, this::streamTimePunctuate); systemTimeSchedule = context.schedule(PunctuationType.SYSTEMWALL_CLOCK_TIME, systemTimeUpperBound, this::systemTimePunctuate); } public void streamTimePunctuate(long streamTime){ periodicBusiness(streamTime); systemTimeSchedule.cancel(); systemTimeSchedule = context.schedule(PunctuationType.SYSTEMWALL_CLOCK_TIME, systemTimeUpperBound, this::systemTimePunctuate); } public void systemTimePunctuate(long systemTime){ periodicBusiness(context.timestamp()); streamTimeSchedule.cancel(); streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval, this::streamTimePunctuate); } public void periodicBusiness(long streamTime){ // guard against streamTime == -1, easy enough. // if you need system time instead, just use System.currentTimeMillis() // do something businessy here } |
...