Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion thread
Vote thread
JIRA

Status

Current state: "Accepted"

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-Proposing-FLIP-25-Support-User-State-TTL-Natively-in-Flink-td20912.html#a22097

Draft: https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM

...

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-3089

Release

...

1.6


eleasedDraft: 1.6https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Reasons to introduce Flink native state expiration:

...

TTL can be configured and enabled in the abstract StateDescriptor and become available in all subclasses:

enum UpdateType { Disabled, OnCreateAndWrite, OnReadAndWrite }

enum StateVisibility { ReturnExpiredIfNotCleanedUp, NeverReturnExpired }

enum TtlTimeCharacteristic { ProcessingTime, EventTime }

class StateTtlConfig {
    UpdateType updateType;    StateVisibility stateVisibility;
    TtlTimeCharacteristic timeCharacteristic;
    Time ttl;
}

abstract class StateDescriptor {   

    void enableTimeToLive(StateTtlConfig ttlConfig) { … }

}

StateTtlConfig ttlConfig = StateTtlConfig

    .newBuilder(Time.seconds(1))

    .setUpdateType(..)   

    …   

    .build();

XxxStateDescriptor stateDesc =

new

new XxxStateDescriptor(...);

stateDesc.enableTimeToLive(ttlConfig);

State value with timestamp

The main idea is to wrap user state value with a class holding the value and the last access timestamp (maybe meta data in future) and use the new object as a value in the existing implementations:

class TtlValue<V> {
    V value;
    long lastAccessTimestamp;
}

Wrapping state factory

The original state factory provided in backends is wrapped with TtlStateFactory if TTL is enabled:

state = stateDesc.getTtlConfig().isEnabled() ?


 

 

new TtlStateFactory(originalStateFactory,..).createState(...) : 

   

originalStateFactory.createState(...);

TtlStateFactory decorates the states produced by the original factory with TTL logic wrappers and adds TtlValue serialisation logic:

TtlStateFactory {
    KeyedStateFactory originalStateFactory;

    TtlTimeProvider timeProvider; // e.g. System.currentTimeMillis()

   

   

<V> TtlValueState<V> createValueState(valueDesc) {

        serializer = new TtlValueSerializer(valueDesc.getSerializer);
        ttlValueDesc = new ValueDesc(valueDesc.name, serializer);
        originalStateWithTtl = originalStateFactory.createValueState(valueDesc);

        returnnew TtlValueState(originalStateWithTtl, timeProvider);

    }

    // List, Map, ...
}

TTL serializer should add expiration timestamp.

...

TTL state decorators use original state with packed TTL and add TTL logic using time provider:

TtlValueState<V> implements ValueState<V> {

    // List, Map, ....
    ValueState<TtlValue<V>> underlyingState;

    TtlTimeProvider timeProvider;


    V value() {
        TtlValue<V> valueWithTtl = underlyingState.get();
        // ttl logic here (e.g. update timestamp)
        return valueWithTtl.getValue();
    }

    void update() {

        ...

        underlyingState.update(valueWithTtl);

        ...

    }
}

Cleanup: issue delete/rewrite upon realising expiration during access/modification.

...

Filter out expired entries in case of full state scan to take a full checkpoint. This approach does not reduce the size of local state used in running job but reduces the size of taken snapshot. In case of restore from such a checkpoint, the local state will not contain expired entries as well.The implementation is based on extending backends to support custom state transformers. The backends call the transformer for each state entry during the full snapshot scan and the transformer decide whether to keep, modify or drop the state entry. TTL has its own relevant implementation of state transformers to check timestamp and filter out expired entries.

public interface StateSnapshotTransformer<T> {

    @Nullable
    T filterOrTransform(T value);
}

To avoid concurrent usage of transformer objects (can be relevant for performance and reuse of serialization buffers), each snapshotting thread uses a factory to produce a thread-confined transformer.

...

The first iteration can be extended with this approach as well, see Future work. It can be used where deterministic cleanup with exact guarantees is required.

The tradeoff here is that even after becoming part of state backends, timer service still requires to store keys twice in rocksdb RocksDB and inside the checkpoint: associated with state and its expiration timestamp which results in a bigger space consumption and extra overhead. The reason is that timers require another data layout sorted by timestamp.

Some lighter cleanup strategies can also be given a try based on the suggested first iteration, see Future work.