Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Authors: Tzu-Li (Gordon) Tai, Congxian Qiu, Kostas Kloudas

Status

...

Current state"FLIP is still WIP!Under discussion"

Discussion thread: (will open once FLIP is completed)

JIRAwill create umbrella JIRA once the proposal is accepted

...

The inconsistency exists as the following, for both checkpoints and savepoints:

  • Different ways of writing metadata to facilitate iterating through serialized state entries across key - groups on restore.
  • Keys and values of the keyed state primitives entries (e.g. ValueState, ListState, MapState, etc.) are also serialized differently.

Having incompatible formats across different state backends forbids users to switch state backends across restores, and are essentially tied to the backend they chose to use from the beginning.

  • also have different binary formats in snapshots.

The differences mainly root from the fact that different state backends maintain working state differently, and consequently have specialized formats that allows them to introduce as less overhead as possible when taking a snapshot of the working state as well as restoring from the serialized from.

While for checkpoints it is completely reasonable to have state backend specific formats for more efficient snapshots and restores, savepoints should be designed with interoperability in mind and allow for operational flexibilities such as swapping state backends across restores.

Moreover, with the current status of state backend code, state backends Moreover, with the current status, state backends are responsible of defining what format they write with. This adds overhead to developing new state backends in the future, as well as the possibility that yet another incompatible format is introduced, making the unification even harder to achieve.

...

  • Unify across all state backends a snapshot savepoint format for keyed state that is more future-proof .
  • Extract the responsibility of writing snapshotted state out of the state backends.
  • and applicable for potential new state backends. Checkpoint formats, by definition, are still allowed to be backend specific.
  • Rework abstractions related to snapshots and restoring, to reduce the overhead and code duplication when attempting to implement a new state backend. 

Current Status

...

NOTE - the remainder of this document uses the following abbreviations. 

...

For us to reason about the proposed unified format later in this document, it is important to understand the current formats and the historical reasons of why they were defined like thatas they are right now.

The written keyed state in snapshots of each operator contains mainly two parts -

  1. meta information for the keyed backend, such as the snapshot of the key serializer being used, as well as meta information about the registered keyed states.
  2. actual state data pairsentries, sequence ordered by (KG, State ID). (mention why the sequence order is like that)

...

Writing of the meta information is governed by the KeyedBackendSerializationProxy class. The class is commonly used by both RocksFullSnapshotStrategy#snapshot() and HeapSnapshotStrategy#snapshot() before all subclasses of SnapshotStrategy, such as RocksFullSnapshotStrategy and HeapSnapshotStrategy, before writing the state data. Therefore, this part is already unified.

The difference lies in the format of the sequence of state data pairsentries. Specifically, they differ by -

  1. how each individual state data pair is representedthe binary format of the key and values in each state entry, and
  2. metadata or markers written along with the stream of pairs to indicate the key group and registered state that the state value belongs to.

...

  1. contiguous state entries to facilitate reading them from state handle streams on restore.

State entries key value binary layout

The difference in binary layout for key and values in state data entries across state backends relates to how they are tailored for how each backend maintains working state.

...

  • The key group index at the beginning of the key group is not a requirement for being able to iterate the contiguous state values. It is written just for sanity checks on restore that the following state values do belong to the expected key group identified by its offset.
  • The number of states to read from the stream was already determined after reading meta information about the keyed state state backend (see diagram XX)
  • The number of (NS, K) state value mappings entries to read for each state is written before the contiguous values. It is possible to get the total number of mappings upfront because state tables are in-memory.

...

  • Define the unified binary format for keyed state backends.
  • Extend / adjust internal abstractions around ...around SnapshotStrategy and RestoreOperation so that new backends added in the future writes keyed state in savepoints in the unified format.

Unified binary format for keyed state

...

To conclude this, the format that RocksDB currently uses is the more general approach for arbitrarily large keyed state. It is feasible to allow heap backend to work with RocksDB backend's current format, but not the other way around.

Implementation

Note that there are a few similar interfaces to what have been proposed above that currently exist and serve similar purposes, but are loosely-coupled and are not shared across different state backends even though it would make sense to do so. For example, the existing StateSnapshot interface currently exists and is similar to the proposed StateSnapshotResources, but is currently only used in the heap backend.

The goal for the proposed implementation is the following:

The goal for the proposed implementation is the following:

  • Allow backends to have different snapshot formats for checkpoints and savepoints
  • Allow seamless migration from previous Flink versions
  • Use a common savepoint strategy used by all keyed backends to define the unified per key group binary layout of contiguous state entries
  • Let binary layouts of
  • Allow backends to have different snapshot formats for checkpoints and savepoints
  • Use a common savepoint strategy used by all keyed backends to define the unified per key group binary layout of contiguous state entries. 
  • Let binary layouts of state entries be defined by key and value serializers obtained from the of each state primitive descriptors's descriptor. Internal state accessors that need to eagerly serialize / deserialize state (e.g. subclasses of AbstractRocksDBState) should be able to use those directly, instead of of defining their own format.

The following sections goes over the main design choices.

Rework SnapshotStrategy class hierarchy to differentiate between savepoint and checkpoint strategies

Currently, the binary layout for keyed state in savepoints for the heap and RocksDB state backend is defined by by HeapSnapshotStrategy and  and RocksFullSnapshotStrategy, respectively. Those classes are used for both savepoints as well as full checkpoints. To be able to define the layout for savepoints and checkpoints of keyed state independently, the hierarchy for for SnapshotStrategy needs  needs to be extended to differentiate a KeyedBackendSavepointStrategyBase and  and a KeyedBackendCheckpointStrategyBase. All existing strategies, including HeapSnapshotStrategy, RocksFullSnapshotStrategy and and RocksDBIncrementalSnapshotStrategy should be rebased onto the checkpoint strategy base (and potentially renamed for better clarity).

Moreover, it should be made explicit that all keyed backends are configured with composed of two different snapshotting strategies, one for savepoints and one for checkpoints. Therefore, the AbstractKeyedStateBackend should the AbstractKeyedBackend should be instantiated with a KeyedBackendSavepointStrategyBase KeyedBackendSavepointStrategyBase and a KeyedBackendCheckpointStrategyBasea KeyedBackendCheckpointStrategyBase, with the correct one being respected on each snapshot attempt. This would eliminate the need for implementations of AbstractKeyedStateBackend of AbstractKeyedBackend to be responsible of any snapshotting concerns as it now comes via composition.

As part of this refactoring effort, we should also work towards a clearer base abstraction for all snapshot strategy implementations. For example, the strategies now commonly include a synchronous part that essentially prepares resources to be used by the asynchronous part of the snapshot. This can be better abstracted as the following:

...

public abstract AbstractSnapshotStrategy<T extends StateObject, SR extends SnapshotResources> implements SnapshotStrategy<SnapshotResult<T>> {
    public abstract AbstractSnapshotStrategy<T extends StateObject, SR extends SnapshotResources> implements SnapshotStrategy<SnapshotResult<T>> {
    @Override
    public final RunnableFuture<SnapshotResult<T>> snapshot(...) {
        SnapshotResources snapshotResources = syncPrepareResources();
        return asyncSnapshot(snapshotResources, ...);
    }

    protected abstract SR syncPrepareResources();
    protected abstract RunnableFuture<SnapshotResult<T>> asyncSnapshot(
        SR syncPartResource,
        long checkpointId,
        long timestamp,
        CheckpointStreamFactory streamFactory,
        CheckpointOptions checkpointOptions);
}
public interface SnapshotResources {
    void release();
    List<StateMetaInfoSnapshot> getStateMetaInfoSnapshots();
}

Unifying the savepoint format for keyed state via KeyedBackendSavepointStrategyBase

KeyedBackendSavepointStrategyBase should be responsible for defining the unified binary layout for keyed state in savepoints. Subclasses would only be responsible for implementing the synchronous part of the snapshot, which essentially creates a StateSnapshotResource. The StateSnapshotResource encapsulates resources such as -

...


SnapshotResources encapsulates resources for the asynchronous part of snapshots, such as -

  • read-only snapshot of the state (for RocksDB, a snapshot of the state (for RocksDB, a snapshot of the database and for heap, a snapshot of the copy-on-write state tables). Any resources created need to be released when the asynchronous part completes.
  • the snapshot of each all registered statestates' s meta information,
  • StateSnapshotIterators for each state which iterates entries of the state in key group order. Please see the next subsection for more detail.
  • meta information

...

public interface SnapshotResources {
    void release();
    List<StateMetaInfoSnapshot> getStateMetaInfoSnapshots();
}

...

Unifying the format for keyed state via KeyedBackendSavepointStrategyBase

KeyedBackendSavepointStrategyBase is responsible for defining the unified binary layout for keyed state in savepoints. Subclasses would only be responsible for providing a state backend specific KeyedStateSavepointSnapshotResource. Apart from the state snapshot and meta info snapshots, the KeyedStateSavepointSnapshotResource additionally provides iterators for snapshotted keyed state -After synchronously creating the StateSnapshotResource, the resource is used by the asynchronous part of the snapshot operation to obtain the state meta information and state entries to write to the checkpoint stream. The StateSnapshotResource defines a release() method which is invoked when the asynchronous part is finished or interrupted.

...

public interface KeyedStateSavepointSnapshotResources extends SnapshotResources {
    List<KeyedStateSnapshotIterator> getKeyedStateSnapshotIterators();
}

...


The asynchronous part of the snapshot defines the per key group unified binary layout for keyed state in savepoints, and is made final. It returns a SavepointKeyGroupsStateHandle, which functionality wise is currently identical to KeyGroupsStateHandle. The reason to introduce a new type of state handle specifically for savepoints is so that on restore, we can use the state handle type to determine which restore strategy to use.

...

public abstract class KeyedBackendSavepointStrategyBase<K, KSR extends KeyedStateSavepointSnapshotResource> implements AbstractSnapshotStrategy<SavepointKeyedStateHandle, KSR> {
    @Override
public final RunnableFuture<SavepointKeyedStateHandle>public abstract class KeyedBackendSavepointStrategyBase<K, KSR extends KeyedStateSavepointSnapshotResource> implements AbstractSnapshotStrategy<KeyedStateHandle, KSR> { @Override public final RunnableFuture<KeyedStateHandle> asyncSnapshot(KSR resources, ...) { // return an AsyncPartCallable implementation that defines the unified per key group layout of state entries }
@Override protected abstract KSR prepareResourcessyncPrepareResources(); }

...

Within the asynchronous part of the snapshot, keyed state snapshot iterators are obtained from the KeyedStateSavepointSnapshotResource. Please refer to the next section for more details regarding the iterators.

Iterating state entries for keyed state snapshots: KeyedStateSnapshotIterator

...

 and KeyedStateSnapshotPerKeyGroupMergeIterator

KeyedStateSnapshotIterator is A StateSnapshotIterator is an iterator that iterates a single registered state entries in key group order. The asynchronous part of the snapshot operation then uses a KeyedStatePerKeyGroupMergeIterator KeyedStateSnapshotPerKeyGroupMergeIterator to combine multiple StateSnapshotIterators and multiple KeyedStateSnapshotIterator to iterate across all registered states key group by key group.

The interface for the StateSnapshotIterator the KeyedStateSnapshotIterator is proposed to be the following:

...

public interface KeyedStateSnapshotIterator {
    void next();
    int getStateId();
    int getKeyGroup();
    byte[] getKey();
    byte[] getValue();
    boolean isNewKeyGroup();
    boolean isValid();
}

The iterator returns a <byte[], byte[]> key value pair for each state entry. Subclass implementations are responsible for serializing state objects in the case of state backend that only lazily serializes state on snapshots (e.g. the heap backend). This should also provide enough flexibility for state backends that may have a mix of serialized and non-serialized working state, such as the disk-spilling heap backend that is currently being discussed [1].

The format of the key and value (i.e. the formats defined in the "Current Status" section) is not defined by the iterator. Call sites of the iterators (i.e. the asynchronous part of the snapshot operation) simply writes the obtained arrays of each key and value, prepended by the byte lengths.

Migrating from Previous Savepoints

WIP

Extra Remarks

WIP

Appendix A

WIP

References

;
}

...

The iterator returns a (byte[], byte[]) key value pair for each state entry. Subclass implementations are responsible for serializing state objects in the case of state backend that only lazily serializes state on snapshots (e.g. the heap backend). This should also provide enough flexibility for state backends that may have a mix of serialized and non-serialized working state, such as the disk-spilling heap backend that is currently being discussed [1].

Restore procedures and migrating from older versions

Likewise to the rework of the SnapshotStrategy hierarchy to differentiate between savepoints and checkpoints, the RestoreOperation hierarchy should also be changed correspondingly to include KeyedBackendSavepointRestoreOperation and KeyedBackendCheckpointRestoreOperation. All existing implementations for keyed backend restore procedures, namely HeapRestoreOperation, RocksDBFullSnapshotRestoreOperation, and RocksDBIncrementalSnapshotRestoreOperation should be rebased onto the checkpoint restore operation and possibly renamed for clarity purposes.

On restore, the keyed state backend builders first check the type of the assigned restored state handles. The following scenarios may be encountered:

  • If it is a SavepointKeyGroupsStateHandle, then the restored state is assured to be a savepoint of the new unified format. It is safe to restore state from the state handles using KeyedBackendSavepointRestoreOperation.
  • If it is a KeyGroupsStateHandle, then we have either restored from a checkpoint after the rework, or a savepoint before the rework. Either way, it will be safe to continue using the existing keyed state restore operations (HeapRestoreOperation, RocksDBFullSnapshotRestoreOperation, and RocksDBIncrementalSnapshotRestoreOperation) to read state, since these were previously used for savepoints as well.

Migrating from Previous Savepoints

With the proposed implementation, users will be able to seamlessly migrate from previous savepoints of older Flink versions. When reading from older versions, old read paths will be used. From then on, new savepoints of keyed state will be written in the new unified format.

References

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Proposal-to-support-disk-spilling-in-HeapKeyedStateBackend-td29109.html[1]