You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current state"Under Discussion"

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-Proposing-FLIP-25-Support-User-State-TTL-Natively-in-Flink-td20912.html#a22097

JIRA Unable to render Jira issues macro, execution error.

Released: -

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The size of user states in Flink can grow infinitely, and users need to a way to delete user states that are too old to prevent problems. Flink currently doesn’t support TTL natively for user states, and developers have to explicitly set timers to reap user states.

To solve all the above problems, we want to support user state TTL natively in Flink with low memory overhead.

*Note that this doc is only for the MVP (or V1) of this feature. More valid features and use cases can be added after MVP.

Public Interfaces

  • Add TtlUpdateEvent, TtlTimeCharacteristic, TtlPolicy
  • Modify existing state descriptor interface to allow specifying TtlPolicy

Example:

enum TtlUpdateEvent {
  // required, default option
  OnCreate,
  // optional
  OnRead,
  OnWrite,
  OnReadWrite
}

enum TtlTimeCharacteristic {
  EVENT_TIME,
  PROCESSING_TIME
}

public class TtlPolicy {
  TtlUpdateEvent ttlEvent;
  int ttl;
  TtlTimeCharacteristics ttlTimeCharacteristics;

  // ...
}

// ttl in sec
int ttl = 5;

// default TTL, using TtlPolicy.OnCreate
TtlPolicy tp1 = new TtlPolicy(ttl, TimeCharacteristic.EVENT_TIME)
// extended TTL, using TtlPolicy.OnCreate and TtlPolicy.onReadWrite
TtlPolicy tp2 = new TtlPolicy(ttl, TimeCharacteristic.PROCESSING_TIME, TtlEvent.OnReadWrite)

ValueState s1 = getRuntimeContext().getState(new ValueStateDescriptor<T>("x", ser, tp1);
ValueState s2 = getRuntimeContext().getState(new ValueStateDescriptor<T>("y", ser, tp2));


Proposed Changes

Goal

We’ll deliver a solution of Flink state TTL of the following characteristics:

  • TTL policy in Flink State is exact TTL. User state will be cleaned exactly when its TTL expires
  • TTL will be supported for both event time and processing time
  • TTL starts to count down when the entry is created by default. Users can specifying TTL trigger policy (see example below) to decide if a state’s TTL will be refreshed upon read or/and update or/and read. More on this later.

TTL Policy

How to count the start time of TTL of a user state? Or, in another way to rephrase it, does Flink support extending/refreshing TTL for a user state?

There are mainly two situations here:

  1. TTL is always started when a user state is created. This is a fundamental behavior
  2. TTL for a user state can be extended/refreshed upon read or/and updated.

Design:

On TTL Policy Situation 1 (TtlUpdateEvent.onCreate, see above ‘TTL Policy’ section),

  • What we need is only one timer (a long) for each keyed state
  • Steps
    1. When a keyed user state is created, operator/state backend registers a timer for it in TimerService
    2. When the registered timer is invoked, it deletes the user state


On TTL Policy Situation 2 (set TtlUpdateEvent.onRead, onWrite, or onReadWrite, , see above ‘TTL Policy’ section)

  • What we need is only one timer (a long) and a timestamp (also a long) for each keyed state
  • Steps
    1. Use a new field lastModifiedTs to remember when a user state is last modified
    2. When a keyed user state is created, lastModifiedTs is set to creation time, and operator/state backend registers a timer for each keyed user state in TimerService
    3. Whenever actions of corresponding policies are taken, update lastModifiedTs of the user state
    4. When the registered timer is invoked, it checks lastModifiedTs of the user state. If lastModifiedTs <= current time (both event and processing time), delete the state; otherwise, register a new timer at (lastModifiedTimestamp+TTL) time, and go to (2)

Example for case 2:

For key K, we have ValueState A with TTL 16, with TTL policy onReadWrite.

  • At time 0, a keyed user state A is created with A.lastModifiedTs=0, a timer is registered for A at time 16
  • At time 2, A’s value is updated, and thus update A.lastModifiedTs=2
  • At time 15, A’s value is read, and thus updateA.lastModifiedTs=15
  • At time 16, the registered timer is invoked, it sees A.lastModifiedTs=15, so it does nothing but registers a new timer at (15+16) = 31

Pros and Cons:

Pros:

 

  1. TTL implementation is independent of state backends, it only relies on the abstraction of TimerService
  2. Because of (1), migration and compatibility can reuse existing solution
  3. This design will support both event time and processing time natively
  4. It has at most one timer for each keyed user state with TTL all the time

 

Cons:

  1. In TTL policy situation 2, we need to store a timer (basically a long) for each TTL keyed user state. There’ll be a little bit memory overhead.

Migration Plan and Compatibility of Checkpoints/Savepoints

According to our knowledge right now, we don’t need to extra work on migration plan and compatibility of checkpoints/savepoints because we are reusing existing wheels, like TimerService state backends, etc, and their behaviors will remain the same as before.

The situation might change as the FLIP evolves.

Implementation Plan

The implementation details are still not very clear at this moment, and we’d like to get a clearer picture as we move on.

However, the basic idea is very simple - TimerService and user states need to have access to each other.

  • TimerService needs to access user states so triggered timer can clean up expired user states.
  • User states need to access TimerService to register timers when certain TtlUpdateEvent happened to user states. We can add callback functions to all state implementations to ensure that we get notified on all state accesses.

Thus the initial step should be moving TimerService to state backends to facilitate the above proposal.

Currently, there are still two issues:

  1. Where should the code live in?
    1. Should it live with operators, state backends, or both?
    2. We may also need something in operators or state backends (e.g. operators/StreamingRuntimeContext/KeyedStateStore) to have access to TimerService
  2. How to distinguish TTL timers from user timers so that they don't interfere with each other.
    1. we need a good strategy to ensure that the TTL timers do not interfere with user-configured timers, i.e., what happens if a user configures a timer for the same time as a Ttl timer, and which one should be invoked first? The main problem right now is that we have to distinguish between user and TTL timers. Currently, timer service does not support timer tags (or another method) to distinguish timers.

 

 

 

Test Plan

Normal unit, integration, and end-to-end tests

Rejected Alternatives

We have another proposal of using rocksdb’s built-in TTL. But it has a few major drawbacks:

  • Rocksdb’s built-in TTL only support TTL on record creation, not able to extend/refresh TTL upon read and/or write
  • It only supports processing time, and doesn’t support event time.
  • The TTL logic heavily rely on state backend implementation. If we add more state backends and rely on their internal feature for TTL, it’s hard to unify their behaviors and will introduce unnecessary system complexity
  • Using external feature to build Flink’s TTL makes migration and compatibility hard

 

 

  • No labels