Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: "Accepted"

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-Proposing-FLIP-25-Support-User-State-TTL-Natively-in-Flink-td20912.html#a22097

Draft: https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM

JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-3089
R

...

eleased1.

...

6

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Reasons to introduce Flink native state expiration:

  • The size of user state in Flink can grow infinitely for certain scenarios and some use cases need to guarantee automatic cleanup of too old state
  • Developers have to make currently ad hoc implementations of TTL themself, e.g. using timer service which might be not space efficient
  • Some legal regulations require data accessibility for limited amount of time. Especially recent changes in EU data privacy law force digital companies to treat personal data very carefully. It drives the priority of the first iteration steps focused on making expired state inaccessible.

This effort is to work out a unified approach for TTL semantics and make it reusable.

Proposed Change

API/Semantics

User can associate or update time-to-live (TTL) for a value or entry of keyed state during certain operations depending on configuration. The data automatically becomes unavailable after expiration of TTL and garbage collected sooner or later.General idea is to provide 

  • relaxed guarantee for state cleanup: state is persisted at least for TTL and then cleaned up based on the best effort: on access and in background
  • exact guarantee for state visibility: expired state is hidden/blocked by API methods even if it is still persisted
  • or relaxed guarantee for state visibility: expired state is returned by API methods if it is still available

...

  • Create TTL configuration
  • Supply it in state descriptor

Configuration of TTL

TTL state cleanup:

  • relaxed (cleanup on access and in the background as next step)
  • (exact with timers as next step)

TTL state visibility for relaxed cleanup:

  • exact (expired is never returned)
  • relaxed (returned if still available)

Update type:

  • only on creation and write
  • on read and creation/write
  • (possibly only on creation, in current design it might degrade write performance because to preserve original timestamp this option requires firstly to read it out and write it back with the updated user value but as one db value)

Time characteristic:

  • processing time
  • event time

...

  • Read:
    • check TTL upon read 
    • discard if expired and issue cleanup delete/rewrite
    • depending on configuration, update if not expired
  • Write (and creation):
    • set/update TTL upon (re)writing new value
    • append: set TTL per added element

Entries in map and list state have separate independent TTLs and get filtered out on expiration while being read out.

Configuration

TTL can be configured and enabled in the abstract StateDescriptor and become available in all subclasses:

enum UpdateType { Disabled, OnCreateAndWrite, OnReadAndWrite }enum StateVisibility { ReturnExpiredIfNotCleanedUp, NeverReturnExpired }enum TtlTimeCharacteristic { ProcessingTime, EventTime }

class StateTtlConfig {
    UpdateType updateType;    StateVisibility stateVisibility;
    TtlTimeCharacteristic timeCharacteristic;
    Time ttl;
}abstract class StateDescriptor {    void enableTimeToLive(StateTtlConfig ttlConfig) { … }}

StateTtlConfig ttlConfig = StateTtlConfig    .newBuilder(Time.seconds(1))    .setUpdateType(..)    …    .build();XxxStateDescriptor stateDesc = new XxxStateDescriptor(...);stateDesc.enableTimeToLive(ttlConfig);

State value with timestamp

The main idea is to wrap user state value with a class holding the value and the last access timestamp (maybe meta data in future) and use the new object as a value in the existing implementations:

class TtlValue<V> {
    V value;
    long lastAccessTimestamp;
}

Wrapping state factory

The original state factory provided in backends is wrapped with TtlStateFactory if TTL is enabled:

state = stateDesc.getTtlConfig().isEnabled() ?
    new TtlStateFactory(originalStateFactory,..).createState(...) :     originalStateFactory.createState(...);

TtlStateFactory decorates the states produced by the original factory with TTL logic wrappers and adds TtlValue serialisation logic:

TtlStateFactory {
    KeyedStateFactory originalStateFactory;    TtlTimeProvider timeProvider; // e.g. System.currentTimeMillis()
        <V> TtlValueState<V> createValueState(valueDesc) {        serializer = new TtlValueSerializer(valueDesc.getSerializer);
        ttlValueDesc = new ValueDesc(valueDesc.name, serializer);
        originalStateWithTtl = originalStateFactory.createValueState(valueDesc);        returnnew TtlValueState(originalStateWithTtl, timeProvider);     }

    // List, Map, ...
}

TTL serializer should add expiration timestamp.

Wrapping state objects

TTL state decorators use original state with packed TTL and add TTL logic using time provider:

...