THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
ProcessorContext context;
long streamTimeInterval = ...;
long systemTimeUpperBound = ...; //e.g. systemTimeUpperBound = streamTimeInterval + some tolerance
Cancellable streamTimeSchedule;
Cancellable systemTimeSchedule;
long lastStreamTimePunctation = -1;
public void init(ProcessorContext context){
this.context = context;
streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval, this::streamTimePunctuate);
systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, systemTimeUpperBound, this::systemTimePunctuate);
}
public void streamTimePunctuate(long streamTime){
periodicBusiness(streamTime);
systemTimeSchedule.cancel();
systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, systemTimeUpperBound, this::systemTimePunctuate);
}
public void systemTimePunctuate(long systemTime){
periodicBusiness(context.timestamp());
streamTimeSchedule.cancel();
streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, streamTimeInterval, this::streamTimePunctuate);
}
public void periodicBusiness(long streamTime){
// guard against streamTime == -1, easy enough.
// if you need system time instead, just use System.currentTimeMillis()
// do something businessy here
} |
...