You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Authors: Tzu-Li (Gordon) Tai, Congxian Qiu, Kostas Kloudas

Status (This is still WIP!)

Current state"FLIP is still WIP!"

Discussion thread(will open once FLIP is completed)

JIRAwill create umbrella JIRA once the proposal is accepted

Released: targeted for Flink 1.9.0

Motivation

Currently, Flink managed user keyed state is serialized with different binary formats across different state backends.

The inconsistency exists as the following:

  • Different ways of writing metadata to facilitate iterating through serialized state across key-groups on restore.
  • Keys and values of the state primitives (e.g. ValueState, ListState, MapState, etc.) are also serialized differently.

Having incompatible formats across different state backends forbids users to switch state backends across restores, and are essentially tied to the backend they chose to use from the beginning.

Moreover, with the current status, state backends are responsible of defining what format they write with. This adds overhead to developing new state backends in the future, as well as the possibility that yet another incompatible format is introduced, making the unification even harder to achieve.

So, the main goal of this proposal is the following:

  • Unify across all state backends a snapshot format that is more future-proof.
  • Extract the responsibility of writing snapshotted state out of the state backends.

Current Status


NOTE - the remainder of this document uses the following abbreviations. 

  • KG: key group
  • K: partition key
  • NS: namespace
  • SV: state value (e.g. value of a ValueState, complete map of a MapState, etc.)
  • UK: key in a user MapState
  • UV: value in a user MapState
  • TS: timestamp (of timers)

For us to reason about the proposed unified format later in this document, it is important to understand the current formats and the historical reasons of why they were defined like that.

The written keyed state in snapshots of each operator contains mainly two parts -

  1. meta information for the keyed backend, such as the snapshot of the key serializer being used, as well as meta information about the registered keyed states.
  2. actual state data pairs, sequence ordered by (KG, State ID). (mention why the sequence order is like that)

Writing of the meta information is governed by the KeyedBackendSerializationProxy class. The class is commonly used by both RocksFullSnapshotStrategy#snapshot() and HeapSnapshotStrategy#snapshot() before writing the state data. Therefore, this part is already unified.

The difference lies in the format of the sequence of state data pairs. Specifically, they differ by -

  1. how each individual state data pair is represented
  2. metadata or markers written along with the stream of pairs to indicate the key group and registered state that the state value belongs to.

State data binary layout

The difference in binary layout for state data across state backends relates to how they are tailored for how each backend maintains working state.

For the HeapKeyedStateBackend, working state is maintained as in-memory state tables that are partitioned by key group. The state tables are nested maps of Map<NS, Map<K, SV>>, providing an outer scope by namespace and an inner scope by key.
Naturally, it is easy to iterate through the state objects by key group when writing to the checkpoint stream during a snapshot. Offsets for each key group is written in the savepoint metadata file, and each state value is written with its namespace and key so that it on restore, deserialized state objects can correctly re-populate the state tables.

For the RocksDBKeyedStateBackend, working state is already serialized and maintained as key-value pairs in RocksDB, with all pairs for a registered state stored in a single column family. In order to be able to iterate through states values in a column family by order of key group during a snapshot, the keys of working state in RocksDB are prepended with the key group ID (int), relying on the fact that RocksDB organizes the data in sorted ordering on the keys. Theoretically, since we already write the offsets of each key group in the savepoint's metadata, the final snapshotted binary layout does not need to have the key group prepended on each key. However, to exclude that, we would introduce extra overhead around removing and prepending again the key group bytes during snapshotting and restoring bytes back into RocksDB. This isn't reasonable considering the small size of the key group bytes; therefore, each snapshotted state value of a RocksDBKeyedStateBackend has the exact same layout as working state, containing also the key group bytes.

The table below provides an overview of the formats for the supported state primitives. Note that although the formats below are represented as tuples (2-tuple for RocksDB to represent it as a key-value entry in RocksDB, and a 3-tuple for heap to represent the levels of the nested map of a state table), the binary layout in the snapshot is really just a direct concatenation of all the serialized values.



RocksDBKeyedStateBackend

HeapKeyedStateBackend

Note

ValueState

[CompositeKey(KG, K, NS), SV]

[NS, K, SV]

  • CompositeKey(...) represents the composite key built via RocksDBSerializedCompositeKeyBuilder#buildCompositeKeyNamespace.
  • Moreover, the MSB bit of the composite key may also be flipped if it is the last entry of a state for a given key group (see RocksFullSnapshotStrategy#setMetaDataFollowsFlagInKey). Note: this only applies to snapshotted state; keys in working state should never have the MSB bit flipped.

ListState

[CompositeKey(KG, K, NS), SV]

[NS, K, SV]

  • The format of SV for ListState is different between heap and RocksDB backend.
  • For the heap backend, the format is defined by the serializer obtained from the state descriptor, which is a ListSerializer.
  • RocksDB backend defines its own format in RocksDBListState#serializeValueList.

MapState

[CompositeKey(KG, K, NS) :: UK, UV]

[NS, K, SV]

  • For the heap backend, each serialized state data is the complete user MapState, written using the serializer obtained from the state descriptor, which is a MapSerializer.
  • For RocksDB backend, each serialized state data is a single entry in the user MapState. User map key and value serializers are obtained from the MapSerializer provided by the state descriptor.

AggregatingState

[CompositeKey(KG, K, NS), SV]

[NS, K, SV]
ReducingState

[CompositeKey(KG, K, NS), SV]

[NS, K, SV]

FoldingState

[CompositeKey(KG, K, NS), SV]

[NS, K, SV]

Timers

[KG :: TS :: K :: NS, (empty)]

TS :: K :: NS

  • the timestamp is a long value with MSB sign bit flipped.
  • Value is empty in RocksDB

Metadata or markers for state iteration

The heap and RocksDB backend has different approaches in how they write metadata or markers in the checkpoint stream to facilitate iterating through contiguous state values of key groups at restore time.
The reason for the difference is mainly due to the fact that for the heap backend, the number of state values is known upfront when taking a snapshot. The following subsections describes this in detail.

HeapKeyedStateBackend

For the HeapKeyedStateBackend, the binary layout of the contiguous state values of a single key group is as follows:


  • The key group index at the beginning of the key group is not a requirement for being able to iterate the contiguous state values. It is written just for sanity checks on restore that the following state values do belong to the expected key group identified by its offset.
  • The number of states to read from the stream was already determined after reading meta information about the keyed state state backend (see diagram XX). 
  • The number of (NS, K) state value mappings to read for each state is written before the contiguous values. It is possible to get the total number of mappings upfront because state tables are in-memory.

Please see HeapRestoreOperation#readKeyGroupStateData() for the implementation.

RocksDBKeyedStateBackend

For the RocksDBKeyedStateBackend, the binary layout of the contiguous state values of a single key group is as follows:

  • Unlike the heap backend, it is not possible to know the number of (NS, K) state value mappings upfront when taking a snapshot. Instead, the RocksDB backend writes metadata along with the stream of contiguous state values in a key group.
  • The last state value mapping for a keyed state will have its MSB bit flipped to indicate that it is the last value for the currently iterated keyed state. Since the values are prepended with the key group which is always a positive integer, the result of flipping the MSB bit of the serialized value would always be setting the MSB bit to 1.
  • When iterating through the contiguous values, if a value with a contiguous bit is reached, the next value would either be a state ID which indicates that the next values are of another new keyed state of the key group, or an END_OF_KEY_GROUP_MARK which indicates that there are no more values to be read for the key group.

Please see RocksDBFullRestoreOperation#readKvStateData() for the implementation.

Proposal

This proposal covers 2 goals:

  • Define the unified binary format for keyed state backends.
  • Extend / adjust internal abstractions around ... so that new backends added in the future writes in the unified format.

Unified binary format for keyed state

We propose to unify the binary layout for all currently supported keyed state backends to match the layout currently adopted by RocksDB, for all levels including the layout of contiguous state values of a key group as well as layout of individual state primitives.

The main consideration points for this decision is the following:

  • The current way the heap backends iterates state values puts a constraint that the keyed state backend needs an efficient way to know the total number of (NS, K) state value mappings, so that this count can be written before the contiguous state values. This highly depends on the state backend; for example, for RocksDB obtaining this count is non-trivial. The approach that RocksDB adopts, by injecting markers and flags to indicate the end of states and key groups when reading from the file streams is definitely the more general and future-proof approach.
  • The RocksDB backend snapshot format has excessive written data due to the key group bytes prepended for each (NS, K) state value mapping. As previously explained, the key group bytes are actually not required to be written with every state mapping in snapshots; they are only required for working state maintained by RocksDB to have an ordering of key value entries by key group. However, the size overhead of this redundancy would be far less prominent than the extra work that RocksDB otherwise requires to truncate and prepend again the key group bytes on each snapshot and restore operation.
  • Keyed MapState are written as a single state value with the current heap backend's snapshot layout, while for RocksDB backend, a single state value maps to an entry in the MapState. The advantage in maintaining entries in the MapState as independent key-values in RocksDB is obvious - accessing and updating random keys in the MapState would not require serializing and deserializing the whole map. Trying to merge the separate entries when snapshotting so that the snapshotted version of MapState is a single state value would not work, since a user MapState can have arbitrarily large number of entries that would not fit into memory (again, this is a constraint because the serializer for writing maps, i.e. the MapSerializer, writes in a format that requires the total number of mappings to be known upfront). Moreover, the work of merging the entries on snapshot and flattening them again simply is non-trivial compared to dumping the key value bytes in RocksDB.

To conclude this, the format that RocksDB currently uses is the more general approach for arbitrarily large keyed state. It is feasible to allow heap backend to work with RocksDB backend's current format, but not the other way around.

Implementation

WIP

Migrating from Previous Savepoints

WIP

Extra Remarks

WIP

  • No labels