Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Authors: Tzu-Li (Gordon) Tai, Congxian Qiu, Kostas Kloudas

Status (This is still WIP!)

Current state"FLIP is still WIP!"

...

Released: targeted for Flink 1.9.0

Motivation

Currently, Flink managed user keyed state is serialized with different binary formats across different state backends.

...

  • Unify across all state backends a snapshot format that is more future-proof.
  • Extract the responsibility of writing snapshotted state out of the state backends.

Current Status

...

NOTE - the remainder of this document uses the following abbreviations. 

...

  1. how each individual state data pair is represented
  2. metadata or markers written along with the stream of pairs to indicate the key group and registered state that the state value belongs to.

State data binary layout

The difference in binary layout for state data across state backends relates to how they are tailored for how each backend maintains working state.

...

The table below provides an overview of the formats for the supported state primitives. Note that although the formats below are represented as tuples (2-tuple for RocksDB to represent it as a key-value entry in RocksDB, and a 3-tuple for heap to represent the levels of the nested map of a state table), the binary layout in the snapshot is really just a direct concatenation of all the serialized values.



RocksDBKeyedStateBackend

HeapKeyedStateBackend

Note

ValueState

[CompositeKey(KG, K, NS), SV]

[NS, K, SV]

  • CompositeKey(...) represents the composite key built via RocksDBSerializedCompositeKeyBuilder#buildCompositeKeyNamespace.
  • Moreover, the MSB bit of the composite key may also be flipped if it is the last entry of a state for a given key group (see RocksFullSnapshotStrategy#setMetaDataFollowsFlagInKey). Note: this only applies to snapshotted state; keys in working state should never have the MSB bit flipped.

ListState

[CompositeKey(KG, K, NS), SV]

[NS, K, SV]

  • The format of SV for ListState is different between heap and RocksDB backend.
  • For the heap backend, the format is defined by the serializer obtained from the state descriptor, which is a ListSerializer.
  • RocksDB backend defines its own format in RocksDBListState#serializeValueList.

MapState

[CompositeKey(KG, K, NS) :: UK, UV]

[NS, K, SV]

  • For the heap backend, each serialized state data is the complete user MapState, written using the serializer obtained from the state descriptor, which is a MapSerializer.
  • For RocksDB backend, each serialized state data is a single entry in the user MapState. User map key and value serializers are obtained from the MapSerializer provided by the state descriptor.

AggregatingState

[CompositeKey(KG, K, NS), SV]

[NS, K, SV]
ReducingState

[CompositeKey(KG, K, NS), SV]

[NS, K, SV]

FoldingState

[CompositeKey(KG, K, NS), SV]

[NS, K, SV]

Timers

[KG :: TS :: K :: NS, (empty)]

TS :: K :: NS

  • the timestamp is a long value with MSB sign bit flipped.
  • Value is empty in RocksDB

Metadata or markers for state iteration

The heap and RocksDB backend has different approaches in how they write metadata or markers in the checkpoint stream to facilitate iterating through contiguous state values of key groups at restore time.
The reason for the difference is mainly due to the fact that for the heap backend, the number of state values is known upfront when taking a snapshot. The following subsections describes this in detail.

HeapKeyedStateBackend

For the HeapKeyedStateBackend, the binary layout of the contiguous state values of a single key group is as follows:

...

Please see HeapRestoreOperation#readKeyGroupStateData() for the implementation.

RocksDBKeyedStateBackend

For the RocksDBKeyedStateBackend, the binary layout of the contiguous state values of a single key group is as follows:

...

Please see RocksDBFullRestoreOperation#readKvStateData() for the implementation.

Proposal

This proposal covers 2 goals:

  • Define the unified binary format for keyed state backends.
  • Extend / adjust internal abstractions around ... so that new backends added in the future writes in the unified format.

Unified binary format for keyed state

We propose to unify the binary layout for all currently supported keyed state backends to match the layout currently adopted by RocksDB, for all levels including the layout of contiguous state values of a key group as well as layout of individual state primitives.

...

To conclude this, the format that RocksDB currently uses is the more general approach for arbitrarily large keyed state. It is feasible to allow heap backend to work with RocksDB backend's current format, but not the other way around.

Implementation

Note that there are a few similar interfaces to what have been proposed above that currently exist and serve similar purposes, but are loosely-coupled and are not shared across different state backends even though it would make sense to do so. For example, the existing StateSnapshot interface currently exists and is similar to the proposed StateSnapshotResources, but is currently only used in the heap backend.

The goal for the proposed implementation is the following:

  • Allow backends to have different snapshot formats for checkpoints and savepoints
  • Use a common savepoint strategy used by all keyed backends to define the unified per key group binary layout of contiguous state entries. 
  • Let binary layouts of state entries be defined by key and value serializers obtained from the state primitive descriptors. Internal state accessors that need to eagerly serialize / deserialize state (e.g. subclasses of AbstractRocksDBState) should be able to use those directly, instead of 


Rework SnapshotStrategy class hierarchy to differentiate between savepoint and checkpoint strategies

Currently, the binary layout for keyed state in savepoints for the heap and RocksDB state backend is defined by HeapSnapshotStrategy and RocksFullSnapshotStrategy, respectively. Those classes are used for both savepoints as well as full checkpoints. To be able to define the layout for savepoints and checkpoints independently, the hierarchy for SnapshotStrategy needs to be extended to differentiate a KeyedBackendSavepointStrategyBase and a KeyedBackendCheckpointStrategyBase. All existing strategies, including HeapSnapshotStrategy, RocksFullSnapshotStrategy and RocksDBIncrementalSnapshotStrategy should be rebased onto the checkpoint strategy base (and potentially renamed for better clarity).

Moreover, it should be made explicit that all keyed backends are configured with two different strategies, one for savepoints and one for checkpoints. Therefore, the AbstractKeyedStateBackend should be instantiated with a KeyedBackendSavepointStrategyBase and a KeyedBackendCheckpointStrategyBase, with the correct one being respected on each snapshot attempt. This would eliminate the need for implementations of AbstractKeyedStateBackend to be responsible of any snapshotting concerns as it now comes via composition.

...

public abstract AbstractSnapshotStrategy<T extends StateObject, SR extends SnapshotResources> implements SnapshotStrategy<SnapshotResult<T>> {
    @Override
    public final RunnableFuture<SnapshotResult<T>> snapshot(...) {
        SnapshotResources snapshotResources = syncPrepareResources();
        return asyncSnapshot(snapshotResources, ...);
    }

    protected abstract SR syncPrepareResources();
    protected abstract RunnableFuture<SnapshotResult<T>> asyncSnapshot(
        SR syncPartResource,
        long checkpointId,
        long timestamp,
        CheckpointStreamFactory streamFactory,
        CheckpointOptions checkpointOptions);
}


...



...


public interface SnapshotResources {
    void release();
    List<StateMetaInfoSnapshot> getStateMetaInfoSnapshots();
}


...


Unifying the savepoint format for keyed state via KeyedBackendSavepointStrategyBase

KeyedBackendSavepointStrategyBase should be responsible for defining the unified binary layout for keyed state in savepoints. Subclasses would only be responsible for implementing the synchronous part of the snapshot, which essentially creates a StateSnapshotResource. The StateSnapshotResource encapsulates resources such as -

  • snapshot of the state (for RocksDB, a snapshot of the database and for heap, a snapshot of the copy-on-write state tables)
  • the snapshot of each registered state's meta information,
  • StateSnapshotIterators for each state which iterates entries of the state in key group order. Please see the next subsection for more detail.

After synchronously creating the StateSnapshotResource, the resource is used by the asynchronous part of the snapshot operation to obtain the state meta information and state entries to write to the checkpoint stream. The StateSnapshotResource defines a release() method which is invoked when the asynchronous part is finished or interrupted.

...

public interface KeyedStateSavepointSnapshotResources extends SnapshotResources {
    List<KeyedStateSnapshotIterator> getKeyedStateSnapshotIterators();
}


...


...


public abstract class KeyedBackendSavepointStrategyBase<K, KSR extends KeyedStateSavepointSnapshotResource> implements AbstractSnapshotStrategy<KeyedStateHandle, KSR> {
    @Override public final RunnableFuture<KeyedStateHandle> asyncSnapshot(KSR resources, ...) {
        // return an AsyncPartCallable implementation that defines the unified per key group layout of state entries
    }

    protected abstract KSR prepareResources();
}

...

Iterating state entries for snapshots: KeyedStateSnapshotIterator and KeyedStatePerKeyGroupMergeIterator

A StateSnapshotIterator is an iterator that iterates a single registered state entries in key group order. The asynchronous part of the snapshot operation then uses a KeyedStatePerKeyGroupMergeIterator to combine multiple StateSnapshotIterators and iterate across all registered states key group by key group.

The interface for the StateSnapshotIterator is proposed to be the following:

...

public interface KeyedStateSnapshotIterator {
    void next();
    int getStateId();
    int getKeyGroup();
    byte[] getKey();
    byte[] getValue();
    boolean isNewKeyGroup();
    boolean isValid();
}

...

The iterator returns a <byte[], byte[]> key value pair for each state entry. Subclass implementations are responsible for serializing state objects in the case of state backend that only lazily serializes state on snapshots (e.g. the heap backend). This should also provide enough flexibility for state backends that may have a mix of serialized and non-serialized working state, such as the disk-spilling heap backend that is currently being discussed [1].

The format of the key and value (i.e. the formats defined in the "Current Status" section) is not defined by the iterator. Call sites of the iterators (i.e. the asynchronous part of the snapshot operation) simply writes the obtained arrays of each key and value, prepended by the byte lengths.WIP


Migrating from Previous Savepoints

WIP

Extra Remarks

WIP

Appendix A

WIP

References

[1]