Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion thread
Vote thread
JIRA

Status

Current state"Under Discussion"

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-Proposing-FLIP-25-Support-User-State-TTL-Natively-in-Flink-td20912.html#a22097

...

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-3089

Release1.6


Draft: https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgMReleased: -

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Reasons to introduce Flink native state expiration:

  • The size of user

...

  • state in Flink can grow infinitely

...

To solve all the above problems, we want to support user state TTL natively in Flink with low memory overhead.

*Note that this doc is only for the MVP (or V1) of this feature. More valid features and use cases can be added after MVP.

Public Interfaces

...

Example:

Code Block
languagejava
enum TtlUpdateEvent {
  // required, default option
  OnCreate,
  // optional
  OnRead,
  OnWrite,
  OnReadWrite
}

enum TtlTimeCharacteristic {
  EVENT_TIME,
  PROCESSING_TIME
}

public class TtlPolicy {
  TtlUpdateEvent ttlEvent;
  int ttl;
  TtlTimeCharacteristics ttlTimeCharacteristics;

  // ...
}

// ttl in sec
int ttl = 5;

// default TTL, using TtlPolicy.OnCreate
TtlPolicy tp1 = new TtlPolicy(ttl, TimeCharacteristic.EVENT_TIME)
// extended TTL, using TtlPolicy.OnCreate and TtlPolicy.onReadWrite
TtlPolicy tp2 = new TtlPolicy(ttl, TimeCharacteristic.PROCESSING_TIME, TtlEvent.OnReadWrite)

ValueState s1 = getRuntimeContext().getState(new ValueStateDescriptor<T>("x", ser, tp1);
ValueState s2 = getRuntimeContext().getState(new ValueStateDescriptor<T>("y", ser, tp2));

...

Goal

We’ll deliver a solution of Flink state TTL of the following characteristics:

  • TTL will be supported for both event time and processing time
  • TTL starts to count down when the entry is created by default. Users can specifying TTL trigger policy (see example below) to decide if a state’s TTL will be refreshed upon read or/and update or/and read. More on this later.
  • User state will stay at least its TTL amount of time, TTL policy in Flink State can be a relaxed or exact TTL.

  • The bottom line is that it shouldn’t get any worse than how users would implement TTL themselves

  • This should be a v1 version of TTL, its goal is to build a minimal valuable project of a generic architecture for all state backends, rather than having state-backend-specific architecture (not implementation). There will be space for improvements, please add the nice-to-have improvements to section "Potential Improvements for Future Versions" at the end.

TTL Policy

How to count the start time of TTL of a user state? Or, in another way to rephrase it, does Flink support extending/refreshing TTL for a user state?

There are mainly two situations here:

  1. TTL is always started when a user state is created. This is a fundamental behavior
  2. TTL for a user state can be extended/refreshed upon read or/and updated.

Design:

On TTL Policy Situation 1 (TtlUpdateEvent.onCreate, see above ‘TTL Policy’ section),

  • What we need is only one timer (a long) for each keyed state
  • Steps
    1. When a keyed user state is created, operator/state backend registers a timer for it in TimerService
    2. When the registered timer is invoked, it deletes the user state

...

  • What we need is only one timer (a long) and a timestamp (also a long) for each keyed state
  • Steps
    1. Use a new field lastModifiedTs to remember when a user state is last modified
    2. When a keyed user state is created, lastModifiedTs is set to creation time, and operator/state backend registers a timer for each keyed user state in TimerService
    3. Whenever actions of corresponding policies are taken, update lastModifiedTs of the user state
    4. When the registered timer is invoked, it checks lastModifiedTs of the user state. If lastModifiedTs <= current time (both event and processing time), delete the state; otherwise, register a new timer at (lastModifiedTimestamp+TTL) time, and go to (2)

Example for case 2:

For key K, we have ValueState A with TTL 16, with TTL policy onReadWrite.

  • At time 0, a keyed user state A is created with A.lastModifiedTs=0, a timer is registered for A at time 16
  • At time 2, A’s value is updated, and thus update A.lastModifiedTs=2
  • At time 15, A’s value is read, and thus updateA.lastModifiedTs=15
  • At time 16, the registered timer is invoked, it sees A.lastModifiedTs=15, so it does nothing but registers a new timer at (15+16) = 31

Pros and Cons:

Pros:

 

  1. TTL implementation is independent of state backends, it only relies on the abstraction of TimerService
  2. Because of (1), migration and compatibility can reuse existing solution
  3. This design will support both event time and processing time natively
  4. It has at most one timer for each keyed user state with TTL all the time

Note: the main design decision is to base this feature on Flink's timer service, and exact TTL is a by-product. Not the other way around. At V2, we can relax the TTL if there's any major perf improvement, e.g. by coalescing timers.

Cons:

  1. In TTL policy situation 2, we need to store a timer (basically a long) for each TTL keyed user state. There’ll be a little bit memory overhead.

Migration Plan and Compatibility of Checkpoints/Savepoints

According to our knowledge right now, we don’t need to extra work on migration plan and compatibility of checkpoints/savepoints because we are reusing existing wheels, like TimerService state backends, etc, and their behaviors will remain the same as before.

The situation might change as the FLIP evolves.

Implementation Plan

The implementation details are evolving, and we’d like to get a clearer picture as we move on.

The basic idea is very simple - TimerService and user states need to have access to each other.

  • TimerService needs to access user states so that triggered timer can clean up expired user states.
  • User states need to access TimerService in order to register timers when certain TtlUpdateEvent happened to user states. We can add callback functions to all state implementations to ensure that we get notified on all state accesses.

Thus, the initial step should be moving TimerService to state backends to facilitate the above proposal. This is actually an ongoing effort that Stefan is pushing right now. I would wait till that effort is done before moving forward and implementing this FLIP.

Currently, there are still two issues:

...

  • for certain scenarios and some use cases need to guarantee automatic cleanup of too old state
  • Developers have to make currently ad hoc implementations of TTL themself, e.g. using timer service which might be not space efficient
  • Some legal regulations require data accessibility for limited amount of time. Especially recent changes in EU data privacy law force digital companies to treat personal data very carefully. It drives the priority of the first iteration steps focused on making expired state inaccessible.

This effort is to work out a unified approach for TTL semantics and make it reusable.

Proposed Change

API/Semantics

User can associate or update time-to-live (TTL) for a value or entry of keyed state during certain operations depending on configuration. The data automatically becomes unavailable after expiration of TTL and garbage collected sooner or later.General idea is to provide 

  • relaxed guarantee for state cleanup: state is persisted at least for TTL and then cleaned up based on the best effort: on access and in background
  • exact guarantee for state visibility: expired state is hidden/blocked by API methods even if it is still persisted
  • or relaxed guarantee for state visibility: expired state is returned by API methods if it is still available

Setup TTL

  • Create TTL configuration
  • Supply it in state descriptor

Configuration of TTL

TTL state cleanup:

  • relaxed (cleanup on access and in the background as next step)
  • (exact with timers as next step)

TTL state visibility for relaxed cleanup:

  • exact (expired is never returned)
  • relaxed (returned if still available)

Update type:

  • only on creation and write
  • on read and creation/write
  • (possibly only on creation, in current design it might degrade write performance because to preserve original timestamp this option requires firstly to read it out and write it back with the updated user value but as one db value)

Time characteristic:

  • processing time
  • event time

TTL behaviour

  • Read:
    • check TTL upon read 
    • discard if expired and issue cleanup delete/rewrite
    • depending on configuration, update if not expired
  • Write (and creation):
    • set/update TTL upon (re)writing new value
    • append: set TTL per added element

Entries in map and list state have separate independent TTLs and get filtered out on expiration while being read out.

Configuration

TTL can be configured and enabled in the abstract StateDescriptor and become available in all subclasses:

enum UpdateType { Disabled, OnCreateAndWrite, OnReadAndWrite }

enum StateVisibility { ReturnExpiredIfNotCleanedUp, NeverReturnExpired }

enum TtlTimeCharacteristic { ProcessingTime, EventTime }

class StateTtlConfig {
    UpdateType updateType;    StateVisibility stateVisibility;
    TtlTimeCharacteristic timeCharacteristic;
    Time ttl;
}

abstract class StateDescriptor {   

    void enableTimeToLive(StateTtlConfig ttlConfig) { … }

}

StateTtlConfig ttlConfig = StateTtlConfig

    .newBuilder(Time.seconds(1))

    .setUpdateType(..)   

    …   

    .build();

XxxStateDescriptor stateDesc = new XxxStateDescriptor(...);

stateDesc.enableTimeToLive(ttlConfig);

State value with timestamp

The main idea is to wrap user state value with a class holding the value and the last access timestamp (maybe meta data in future) and use the new object as a value in the existing implementations:

class TtlValue<V> {
    V value;
    long lastAccessTimestamp;
}

Wrapping state factory

The original state factory provided in backends is wrapped with TtlStateFactory if TTL is enabled:

state = stateDesc.getTtlConfig().isEnabled() ? new TtlStateFactory(originalStateFactory,..).createState(...) : originalStateFactory.createState(...);

TtlStateFactory decorates the states produced by the original factory with TTL logic wrappers and adds TtlValue serialisation logic:

TtlStateFactory {
    KeyedStateFactory originalStateFactory;

    TtlTimeProvider timeProvider; // e.g. System.currentTimeMillis()

    <V> TtlValueState<V> createValueState(valueDesc) {

        serializer = new TtlValueSerializer(valueDesc.getSerializer);
        ttlValueDesc = new ValueDesc(valueDesc.name, serializer);
        originalStateWithTtl = originalStateFactory.createValueState(valueDesc);

        returnnew TtlValueState(originalStateWithTtl, timeProvider);

    }

    // List, Map, ...
}

TTL serializer should add expiration timestamp.

Wrapping state objects

TTL state decorators use original state with packed TTL and add TTL logic using time provider:

TtlValueState<V> implements ValueState<V> {

    // List, Map, ....
    ValueState<TtlValue<V>> underlyingState;

    TtlTimeProvider timeProvider;


    V value() {
        TtlValue<V> valueWithTtl = underlyingState.get();
        // ttl logic here (e.g. update timestamp)
        return valueWithTtl.getValue();
    }

    void update() {

        ...

        underlyingState.update(valueWithTtl);

        ...

    }
}

Cleanup: issue delete/rewrite upon realising expiration during access/modification.

Save-/checkpoint save/restore

Values wrapped with timestamp are serialised and snapshotted the same way as without it just using the enhanced TTL serializer.

TTL config is just a way of interpreting state value and does not associate any stateful meta info. TtlValueSerialiser saved as state value serializer already enforces compatibility check, e.g. if TTL'ed state is restored with disabled TTL config.

Additional Cleanup Strategies

Filter out expired entries in case of full state scan to take a full checkpoint. This approach does not reduce the size of local state used in running job but reduces the size of taken snapshot. In case of restore from such a checkpoint, the local state will not contain expired entries as well.The implementation is based on extending backends to support custom state transformers. The backends call the transformer for each state entry during the full snapshot scan and the transformer decide whether to keep, modify or drop the state entry. TTL has its own relevant implementation of state transformers to check timestamp and filter out expired entries.

public interface StateSnapshotTransformer<T> {

    @Nullable
    T filterOrTransform(T value);
}

To avoid concurrent usage of transformer objects (can be relevant for performance and reuse of serialization buffers), each snapshotting thread uses a factory to produce a thread-confined transformer.

This approach enables lazy background cleanup of state with TTL in JVM heap backend. The idea is to keep a global state lazy iterator with loose consistency. Every time a state value for some key is accessed or a record is processed, the iterator is advanced, TTL of iterated state entries is checked and the expired entries are cleaned up. When the iterator reaches the end of state storage it just starts over. This way the state with TTL is regularly cleaned up to prevent ever growing memory consumption.

The caveat of this cleanup strategy is that if state is not accessed or no records are processed then accumulated expired state still occupies the storage which should be rather impractical case.

In case of rocksdb backend, we can piggy back compaction using custom compaction filter which checks our last access timestamp and discards expired values. It requires contributing a C++ Flink TTL specific filter to Rocksdb, like for cassandra. At the moment RocksDB does not support compaction filter plugins (see PR discussion), it is under development. Meanwhile, we can apply to strategies to enable this feature in Flink:

  • Release and maintain a temporary fork of RocksDB for Flink: FRocksDB and merge TTL filter into this fork (used in Flink 1.8)
  • Build C++ TTL filter separately, pack this C++ lib into its JNI java client jar and load it in Flink additionally to vanila RocksDB (Flink RocksDB extensions, under development)

The second strategy is more flexible in the long run.

The event time is opted for in StateTtlConfig by setting TtlTimeCharacteristic.EventTime.To define how to expire elements, we need to define which timestamp to save when the state entry is accessed/updated and which timestamp is used to check expiration. In case of processing time, the time semantics is straightforward: we always use the current processing time. The definition of event time semantics is a bit trickier. The proposal, based on ML discussion thread, is the following at the moment:

Last access timestamp: Event timestamp of currently being processed record

Current timestamp to check expiration has two options:

  • Last emitted watermark
  • Current processing time

Therefore, TtlTimeProvider will need two methods: getAccessTimestamp and getCurrentTimestamp.

Moreover, to enable event time support, event timestamp of the record and the updated watermark needs to be passed to the state backend, shared with TTL state wrappers and additional cleanup strategies (snapshot transformers and compaction filter).

Event time provider

Different implementations of TtlTimeProvider, which e.g. holds current watermark, needs to be passed to the state backend at the moment of its creation in StreamTaskStateInitializerImpl. There are several ways to update watermark in this implementation of TtlTimeProvider:

  • in InternalTimeServiceManager.advanceWatermark explicitly
  • InternalTimeServiceManager/InternalTimerServiceImpl could be refactored to use shared EventTimeService which holds current updatable watermark and wrapped by TtlTimeProvider

The TTL state wrapping factory should create TTL state wrappers and snapshot transformers with TtlTimeProvider selected by TtlTimeCharacteristic.

RocksDB TTL compaction filter

The RocksDB TTL compaction filter factory needs to get selected TtlTimeProvider when it gets configured. There are two ways:

  • make it volatile and settable in RocksDbTtlCompactFiltersManager.TimeProviderWrapper, track it in RocksDbTtlCompactFiltersManager along with FlinkCompactionFilterFactory to configure later before configuring FlinkCompactionFilterFactory.
  • Move FlinkCompactionFilter.TimeProvider from FlinkCompactionFilterFactory to ConfigHolder and set selected TtlTimeProvider with the Config.

The second option does not use volatile variable and should be more performant but needs changing RocksDB java client and either releasing new version FRocksDB or Flink RocksDB extensions

Migration Plan and Compatibility

This feature introduces a new type of state which did not exist before. All current state types stay the same so it does not need specific migration. Adding TTL to or removing it from the existing state requires an explicit custom migration, basically transforming the stored state by adding or removing bytes of last access timestamp.

Future work

  • register processing/event timer per state entry for exact cleanup upon expiration callback, inject it into TTL state decorators (the conflicts and precedence with user timers should be addressed)
  • support queryable state with TTL
  • set TTL in state get/update methods and/or set current TTL in state object
  • state TTL migration: upon restoration add or drop TTL for the existing state which has or not had it before
  • support optional prolonging of state TTL in case of e.g. disaster recovery to prevent real time state expiration during downtime (Jira FLINK-9661)
  • probably out of scope: potentially introduce generalised meta info (including timestamp) associated with each state value

Rejected Alternatives

Previous version of Flip-25

TtlDb

Embedded TTL per state name/column family

  • Only processing time
  • Get API can return expired entries w/o explicitly informing about it

Timer service or dedicated column family

design doc

The first iteration can be extended with this approach as well, see Future work. It can be used where deterministic cleanup with exact guarantees is required.

The tradeoff here is that even after becoming part of state backends, timer service still requires to store keys twice in RocksDB and inside the checkpoint: associated with state and its expiration timestamp which results in a bigger space consumption and extra overhead. The reason is that timers require another data layout sorted by timestamp.

Some lighter cleanup strategies can also be given a try based on the suggested first iteration, see Future work.

...

We'll also add the following new internal APIs to register TTL timers:
Code Block
languagejava
@Internal
public void registerTtlProcessingTimeTimer(N namespace, long time);

@Internal
public void registerTtlEventTimeTimer(N namespace, long time);

...

 

Test Plan

Normal unit, integration, and end-to-end tests

Potential Improvement for future versions

Rejected Alternatives

We have another proposal of using rocksdb’s built-in TTL - TtlDB (https://github.com/facebook/rocksdb/wiki/Time-to-Live). But it has a few major drawbacks:

  • TtlDB only supports TTL on record creation, not able to extend/refresh TTL upon read and/or write

  • TtlDB only supports one TTL for all records creation in all column families creation in the a single db opening. Flink currently stores one type of keyed user state in one column family, besides it’s inapproperiate to open and close TtlDB too frequently.

  • TtlDB only supports processing time TTL, not event time.

  • The TTL logic heavily rely on state backend implementation. If we add more state backends and rely on their internal feature for TTL, it’s hard to unify their behaviors and will introduce unnecessary system complexity
  • Using external feature to build Flink’s TTL makes migration and compatibility hard