...
- Where should the code live in? Should it live with operators, state backends, or both? We may also need something in operators or state backends (e.g. operators/StreamingRuntimeContext/KeyedStateStore) to have access to TimerService
We may need to experiment and prototype before finalizing this decision. How to distinguish TTL timers from user timers so that user timers are always invoked before TTL timers?
Upon discussion, we believe that the user timer should always be invoked first. We need a good strategy to ensure that, when a user timer and a TTL timer are configured for the same time, user timer is always triggered first. Currently, timer service does not support timer tags. We have two kinds of timers now and we need a better way of maintaining both of them.
Option1: We can add a tag for each timer to InternalTimer
Option2: let InternalTimerService maintains user timers and TTL timers separately. Implementation classes of InternalTimerService should add two new sets of timers, e.g. processingTtlTimeTimersQueue and eventTtlTimeTimersQueue for HeapInternalTimerService. Upon onProcessingTime() and advanceWatermark(), they will first iterate through ProcessingTimeTimer and EventTimeTimer before iterating the ProcessingTtlTimeTimers and EventTtlTimeTimer.by adding to it the following new internal APIs:We'll also add the following new internal APIs to register TTL timers:
And implementation classes of InternalTimerService should add two new sets of timers, e.g. processingTtlTimeTimersQueue and eventTtlTimeTimersQueue for HeapInternalTimerService. Upon onProcessingTime() and advanceWatermark(), they will first iterate through ProcessingTimeTimer and EventTimeTimer before iterating the ProcessingTtlTimeTimers and EventTtlTimeTimer.Code Block language java @Internal public void registerTtlProcessingTimeTimer(N namespace, long time); @Internal public void registerTtlEventTimeTimer(N namespace, long time);
The biggest advantage, compared to option 1, is that it doesn't impact existing timer-related checkpoint/savepoint and migrations.
Test Plan
Normal unit, integration, and end-to-end tests
...