...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Reasons to introduce Flink native state expiration:
...
TTL can be configured and enabled in the abstract StateDescriptor and become available in all subclasses:
enum UpdateType { Disabled, OnCreateAndWrite, OnReadAndWrite } enum StateVisibility { ReturnExpiredIfNotCleanedUp, NeverReturnExpired } enum TtlTimeCharacteristic { ProcessingTime, EventTime } abstract class StateDescriptor { void enableTimeToLive(StateTtlConfig ttlConfig) { … } } .newBuilder(Time.seconds(1)) .setUpdateType(..) … .build(); XxxStateDescriptor stateDesc = |
new XxxStateDescriptor(...); stateDesc.enableTimeToLive(ttlConfig); |
State value with timestamp
The main idea is to wrap user state value with a class holding the value and the last access timestamp (maybe meta data in future) and use the new object as a value in the existing implementations:
class TtlValue<V> { V value; long lastAccessTimestamp; } |
Wrapping state factory
The original state factory provided in backends is wrapped with TtlStateFactory if TTL is enabled:
state = stateDesc.getTtlConfig().isEnabled() ? new TtlStateFactory(originalStateFactory,..).createState(...) : originalStateFactory.createState(...); |
TtlStateFactory decorates the states produced by the original factory with TTL logic wrappers and adds TtlValue serialisation logic:
TtlStateFactory { TtlTimeProvider timeProvider; // e.g. System.currentTimeMillis() |
<V> TtlValueState<V> createValueState(valueDesc) { serializer = new TtlValueSerializer(valueDesc.getSerializer); returnnew TtlValueState(originalStateWithTtl, timeProvider); } |
TTL serializer should add expiration timestamp.
...
TTL state decorators use original state with packed TTL and add TTL logic using time provider:
TtlValueState<V> implements ValueState<V> { // List, Map, .... TtlTimeProvider timeProvider;
void update() { ... underlyingState.update(valueWithTtl) ... } |
Cleanup: issue delete/rewrite upon realising expiration during access/modification.
...