Status
Current state: "Under Discussion"
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-Proposing-FLIP-25-Support-User-State-TTL-Natively-in-Flink-td20912.html#a22097
JIRA:
Released: -
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The size of user states in Flink can grow infinitely, and users need to a way to delete user states that are too old to prevent problems. Flink currently doesn’t support TTL natively for user states, and developers have to explicitly set timers to reap user states.
To solve all the above problems, we want to support user state TTL natively in Flink with low memory overhead.
*Note that this doc is only for the MVP (or V1) of this feature. More valid features and use cases can be added after MVP.
Public Interfaces
- Add TtlUpdateEvent, TtlTimeCharacteristic, TtlPolicy
- Modify existing state descriptor interface to allow specifying TtlPolicy
Example:
enum TtlUpdateEvent { // required, default option OnCreate, // optional OnRead, OnWrite, OnReadWrite } enum TtlTimeCharacteristic { EVENT_TIME, PROCESSING_TIME } public class TtlPolicy { TtlUpdateEvent ttlEvent; int ttl; TtlTimeCharacteristics ttlTimeCharacteristics; // ... } // ttl in sec int ttl = 5; // default TTL, using TtlPolicy.OnCreate TtlPolicy tp1 = new TtlPolicy(ttl, TimeCharacteristic.EVENT_TIME) // extended TTL, using TtlPolicy.OnCreate and TtlPolicy.onReadWrite TtlPolicy tp2 = new TtlPolicy(ttl, TimeCharacteristic.PROCESSING_TIME, TtlEvent.OnReadWrite) ValueState s1 = getRuntimeContext().getState(new ValueStateDescriptor<T>("x", ser, tp1); ValueState s2 = getRuntimeContext().getState(new ValueStateDescriptor<T>("y", ser, tp2));
Proposed Changes
Goal
We’ll deliver a solution of Flink state TTL of the following characteristics:
- TTL will be supported for both event time and processing time
- TTL starts to count down when the entry is created by default. Users can specifying TTL trigger policy (see example below) to decide if a state’s TTL will be refreshed upon read or/and update or/and read. More on this later.
- TTL policy in Flink State is exact TTL. User state will be cleaned exactly when its TTL expires
TTL Policy
How to count the start time of TTL of a user state? Or, in another way to rephrase it, does Flink support extending/refreshing TTL for a user state?
There are mainly two situations here:
- TTL is always started when a user state is created. This is a fundamental behavior
- TTL for a user state can be extended/refreshed upon read or/and updated.
Design:
On TTL Policy Situation 1 (TtlUpdateEvent.onCreate, see above ‘TTL Policy’ section),
- What we need is only one timer (a long) for each keyed state
- Steps
- When a keyed user state is created, operator/state backend registers a timer for it in TimerService
- When the registered timer is invoked, it deletes the user state
On TTL Policy Situation 2 (set TtlUpdateEvent.onRead, onWrite, or onReadWrite, , see above ‘TTL Policy’ section)
- What we need is only one timer (a long) and a timestamp (also a long) for each keyed state
- Steps
- Use a new field lastModifiedTs to remember when a user state is last modified
- When a keyed user state is created, lastModifiedTs is set to creation time, and operator/state backend registers a timer for each keyed user state in TimerService
- Whenever actions of corresponding policies are taken, update lastModifiedTs of the user state
- When the registered timer is invoked, it checks lastModifiedTs of the user state. If lastModifiedTs <= current time (both event and processing time), delete the state; otherwise, register a new timer at (lastModifiedTimestamp+TTL) time, and go to (2)
Example for case 2:
For key K, we have ValueState A with TTL 16, with TTL policy onReadWrite.
- At time 0, a keyed user state A is created with A.lastModifiedTs=0, a timer is registered for A at time 16
- At time 2, A’s value is updated, and thus update A.lastModifiedTs=2
- At time 15, A’s value is read, and thus updateA.lastModifiedTs=15
- At time 16, the registered timer is invoked, it sees A.lastModifiedTs=15, so it does nothing but registers a new timer at (15+16) = 31
Pros and Cons:
Pros:
- TTL implementation is independent of state backends, it only relies on the abstraction of TimerService
- Because of (1), migration and compatibility can reuse existing solution
- This design will support both event time and processing time natively
- It has at most one timer for each keyed user state with TTL all the time
Cons:
- In TTL policy situation 2, we need to store a timer (basically a long) for each TTL keyed user state. There’ll be a little bit memory overhead.
Migration Plan and Compatibility of Checkpoints/Savepoints
According to our knowledge right now, we don’t need to extra work on migration plan and compatibility of checkpoints/savepoints because we are reusing existing wheels, like TimerService state backends, etc, and their behaviors will remain the same as before.
The situation might change as the FLIP evolves.
Implementation Plan
The implementation details are evolving, and we’d like to get a clearer picture as we move on.
The basic idea is very simple - TimerService and user states need to have access to each other.
- TimerService needs to access user states so that triggered timer can clean up expired user states.
- User states need to access TimerService in order to register timers when certain TtlUpdateEvent happened to user states. We can add callback functions to all state implementations to ensure that we get notified on all state accesses.
Thus the initial step should be moving TimerService to state backends to facilitate the above proposal.
Currently, there are still two issues:
- Where should the code live in? Should it live with operators, state backends, or both? We may also need something in operators or state backends (e.g. operators/StreamingRuntimeContext/KeyedStateStore) to have access to TimerService
We may need to experiment and prototype before finalizing this decision. How to distinguish TTL timers from user timers so that user timers are always invoked before TTL timers?
Upon discussion, we believe that the user timer should always be invoked first. We need a good strategy to ensure that, when a user timer and a TTL timer are configured for the same time, user timer is always triggered first. Currently, timer service does not support timer tags. We have two kinds of timers now and we need a better way of maintaining both of them.
Option1: We can add a tag for each timer to InternalTimer
Option2: let InternalTimerService maintains user timers and TTL timers separately. Implementation classes of InternalTimerService should add two new sets of timers, e.g. processingTtlTimeTimersQueue and eventTtlTimeTimersQueue for HeapInternalTimerService. Upon onProcessingTime() and advanceWatermark(), they will first iterate through ProcessingTimeTimer and EventTimeTimer before iterating the ProcessingTtlTimeTimers and EventTtlTimeTimer.by adding to it the following new internal APIs:We'll also add the following new internal APIs to register TTL timers:@Internal public void registerTtlProcessingTimeTimer(N namespace, long time); @Internal public void registerTtlEventTimeTimer(N namespace, long time);
The biggest advantage, compared to option 1, is that it doesn't impact existing timer-related checkpoint/savepoint, restore and migrations.
Test Plan
Normal unit, integration, and end-to-end tests
Rejected Alternatives
We have another proposal of using rocksdb’s built-in TTL. But it has a few major drawbacks:
- Rocksdb’s built-in TTL only support TTL on record creation, not able to extend/refresh TTL upon read and/or write
- It only supports processing time, and doesn’t support event time.
- The TTL logic heavily rely on state backend implementation. If we add more state backends and rely on their internal feature for TTL, it’s hard to unify their behaviors and will introduce unnecessary system complexity
- Using external feature to build Flink’s TTL makes migration and compatibility hard