Status
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
We propose to add a new type of state that can be used to create simpler and more efficient implementations of the operators that maintain a MapState<?, List<?>>
and/or require range operations. This includes temporal joins, operators that sort their input streams by timestamp, and a few other cases:
- TemporalRowTimeJoinOperator
- RowTimeSortOperator
- CepOperator
- IntervalJoinOperator
- AbstractStreamArrowPythonOverWindowAggregateFunctionOperator
- The various over/rank window operators
- Min/max with retractions
- Custom window implementations using keyed process functions
Several of these operators are using MapState<Long, List<T>>
to store lists of records associated with timestamps. Updating those lists is expensive, especially when they are stored in RocksDB, as that requires deserializing and reserializing the lists when appending new list entries.
With this proposal, the common case of MapState<Long, List<T>>
becomes BinarySortedState<Long, T>
. The implementation can exploit namespaces in the state backends for representing the keys (timestamps, in this case), and the values can be stored as lists using each state backend’s native ListState implementation.
But rather than limiting the scope to using Longs as the keys in BinarySortedState<UK, UV>
, we propose instead to support any type for which a lexicographically ordered type serializer can be implemented. This means that when comparing any two keys, a and b, their binary, serialized representations will be ordered in exactly the same way as the original objects, a and b. We can then take advantage of the fact that RocksDB stores its key/value pairs in order, sorted by the serialized representations of the keys, to implement efficient range scans.
This FLIP also makes it possible for users to create custom time windows that are implemented as keyed process functions that achieve performance similar to that of the built-in window operators. This isn’t currently possible because the window operators take advantage of the state backends’ namespaces to optimize state access as outlined above, something that can’t be done with Flink’s public APIs. By exposing these namespaces in this limited, controlled way, users will be able to achieve comparable performance.
An implementation is underway. Benchmarking has shown a ~130% increase in throughput for sorting, and ~60% for temporal joins.
Public Interfaces
The complete proposed interface for BinarySortedState<UK, UV>
is shown below. In summary, it includes:
Point operations:
valuesAt(key)
add(key, value)
put(key, values)
clearEntryAt(key)
Lookups:
firstEntry(), firstEntry(fromKey)
lastEntry(), lastEntry(UK toKey)
Range-operations:
readRange(fromKey, toKey, inclusiveToKey)
readRangeUntil(toKey, inclusiveToKey)
readRangeFrom(fromKey)
Range-deletes:
clearEntryAt(key)
clearRange(fromKey, toKey, inclusiveToKey)
clearRangeUntil(toKey, inclusiveToKey)
clearRangeFrom(fromKey)
Cleanup:
clear()
@PublicEvolving public interface BinarySortedState<UK, UV> extends State { /** * Returns the current values associated with the given key. * * @param key The key of the mapping * @return The values of the mapping with the given key (or <tt>null</tt> if there is none) * @throws Exception Thrown if the system cannot access the state. */ @Nullable Iterable<UV> valuesAt(UK key) throws Exception; /** * Returns the current values associated with the first key in state, based on the state's * ordering function. * * @return The entry of the first key (or <tt>null</tt> if there is none) * @throws Exception Thrown if the system cannot access the state. */ @Nullable Map.Entry<UK, Iterable<UV>> firstEntry() throws Exception; /** * Returns the current values associated with the first key in state at or after {@code * fromKey}, based on the state's ordering function. * * @param fromKey The query key (inclusive) * @return The entry of the first key after {@code fromKey} (or <tt>null</tt> if there is none) * @throws Exception Thrown if the system cannot access the state. */ @Nullable Map.Entry<UK, Iterable<UV>> firstEntry(UK fromKey) throws Exception; /** * Returns the current values associated with the last key in state, based on the state's * ordering function. * * @return The entry of the last key (or <tt>null</tt> if there is none) * @throws Exception Thrown if the system cannot access the state. */ @Nullable Map.Entry<UK, Iterable<UV>> lastEntry() throws Exception; /** * Returns the current values associated with the last key in state at or before {@code toKey}, * based on the state's ordering function. * * @param toKey The query key (inclusive) * @return The entry of the last key at or before {@code toKey} (or <tt>null</tt> if there is * none) * @throws Exception Thrown if the system cannot access the state. */ @Nullable Map.Entry<UK, Iterable<UV>> lastEntry(UK toKey) throws Exception; /** * Associates a new set of values with the given key (overwrites the previous mapping). * * @param key The key of the mapping * @param values The new values of the mapping * @throws Exception Thrown if the system cannot access the state. */ void put(UK key, Iterable<UV> values) throws Exception; /** * Adds a new value to the mapping with the given key. * * @param key The key of the mapping * @param value The new value to add to the mapping * @throws Exception Thrown if the system cannot access the state. */ void add(UK key, UV value) throws Exception; /** * Returns all the mappings in the state for the given range of keys. * * @param fromKey The first key of the mappings to return (inclusive) * @param toKey The last key of the mappings to return * @param inclusiveToKey Whether {@code toKey} should be inclusive or not * @return An iterable view of all the state's key-values pairs in the given range. * @throws Exception Thrown if the system cannot access the state. */ Iterable<Map.Entry<UK, Iterable<UV>>> readRange(UK fromKey, UK toKey, boolean inclusiveToKey) throws Exception; /** * Returns all the mappings in the state for the range of the smallest key until the given key. * * @param toKey The last key of the mappings to return * @param inclusiveToKey Whether {@code toKey} should be inclusive or not * @return An iterable view of all the state's key-values pairs in the given range. * @throws Exception Thrown if the system cannot access the state. */ Iterable<Map.Entry<UK, Iterable<UV>>> readRangeUntil(UK toKey, boolean inclusiveToKey) throws Exception; /** * Returns all the mappings in the state for the range of the given key (inclusive) until the * largest key. * * @param fromKey The first key of the mappings to return * @return An iterable view of all the state's key-values pairs in the given range. * @throws Exception Thrown if the system cannot access the state. */ Iterable<Map.Entry<UK, Iterable<UV>>> readRangeFrom(UK fromKey) throws Exception; /** * Deletes all values for the mapping with the given key. * * @param key The key of the mapping * @throws Exception Thrown if the system cannot access the state. */ void clearEntryAt(UK key) throws Exception; /** * Deletes all values in the given range of keys. * * @param fromKey The first key of the mappings to remove (inclusive) * @param toKey The last key of the mappings to remove * @param inclusiveToKey Whether {@code toKey} should be inclusive or not * @throws Exception Thrown if the system cannot access the state. */ void clearRange(UK fromKey, UK toKey, boolean inclusiveToKey) throws Exception; /** * Deletes all values in the range of the smallest key until the given key. * * @param toKey The last key of the mappings to remove * @param inclusiveToKey Whether {@code toKey} should be inclusive or not * @throws Exception Thrown if the system cannot access the state. */ void clearRangeUntil(UK toKey, boolean inclusiveToKey) throws Exception; /** * Deletes all values in the range of the given key (inclusive) until the largest key. * * @param fromKey The first key of the mappings to remove * @throws Exception Thrown if the system cannot access the state. */ void clearRangeFrom(UK fromKey) throws Exception; /** Deletes all values from the state. */ void clear(); }
Under this proposal, a new state descriptor type is added:
@PublicEvolving public class BinarySortedStateDescriptor<UK, UV> extends ListStateDescriptor<UV> { /** * Create a new {@code BinarySortedStateDescriptor} with the given name and the given type * serializers. * * @param name The name of the {@code MapStateDescriptor}. * @param keySerializer The type serializer for the keys in the state. * @param valueSerializer The type serializer for the values in the state. */ public BinarySortedStateDescriptor( String name, LexicographicalTypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer); /** * Create a new {@code BinarySortedStateDescriptor} with the given name and the given type * information. * * @param name The name of the {@code MapStateDescriptor}. * @param keySerializer The type serializer for the keys in the state. * @param valueTypeInfo The type information for the values in the state. */ public BinarySortedStateDescriptor( String name, LexicographicalTypeSerializer<UK> keySerializer, TypeInformation<UV> valueTypeInfo); /** * Create a new {@code BinarySortedStateDescriptor} with the given name and the given type * information. * * <p>If this constructor fails (because it is not possible to describe the type via a class), * consider using the {@link #BinarySortedStateDescriptor(String, LexicographicalTypeSerializer, * TypeInformation)} constructor. * * @param name The name of the {@code MapStateDescriptor}. * @param keySerializer The type serializer for the keys in the state. * @param valueType The class of the type of values in the state. */ public BinarySortedStateDescriptor( String name, LexicographicalTypeSerializer<UK> keySerializer, Class<UV> valueType); /** * Gets the serializer for the keys in the state. * * @return The serializer for the keys in the state. */ public LexicographicalTypeSerializer<UK> getKeySerializer(); }
Here the LexicographicalTypeSerializer<T>
is a special TypeSerializer
that provides a well-defined binary sort order that we will use in the operations above (ideally the same that you would expect from the original Java objects). For better performance with heap-based state-backends, however, you can also provide an efficient Java object comparator based on the data type's binary sort order. This way, heap-based backends don't have to serialize keys in order to sort them. The proposal is to initially provide only an internal implementation of a LexicographicLongSerializer
in order to support BinarySortedState<Long, T>
out of the box. Users will be able to extend LexicographicalTypeSerializer
if they want to use other types as the keys for ordering the state.
@PublicEvolving public abstract class LexicographicalTypeSerializer<T> extends TypeSerializer<T> { /** * Provides an efficient Java object comparator based on this data type's binary sort order. * * @return If empty, implementations can compare based on serialized form and using {@link * BinarySortOrderComparator#INSTANCE}, otherwise use the provided (efficient) comparator. */ public Optional<Comparator<T>> findComparator() {} public static final class BinarySortOrderComparator implements Comparator<byte[]> { public static final BinarySortOrderComparator INSTANCE = new BinarySortOrderComparator(); @Override public int compare(byte[] first, byte[] second) {} /** * Variant of {@link #compare(byte[], byte[])} that starts the comparison from a given * position. This can be used if you know that the first <code>from</code> bytes are equal. * * @param first the first object to be compared. * @param second the second object to be compared. * @param from array position to start the comparison at * @return a negative integer, zero, or a positive integer as the first argument is less * than, equal to, or greater than the second. * @see #compare(byte[], byte[]) */ public int compare(byte[] first, byte[] second, int from) {} } }
Here's an example of how these pieces fit together:
private transient BinarySortedState<Long, Long> events; BinarySortedStateDescriptor<Long, Long> desc = new BinarySortedStateDescriptor<>( "Events", LexicographicLongSerializer.INSTANCE, Types.LONG); events = getRuntimeContext() .getBinarySortedState(desc);
As illustrated above, RuntimeContext
(and KeyedStateStore)
need to be extended with a method for registering BinarySorted
State
:
@PublicEvolving <UK, UV> BinarySortedState<UK, UV> getBinarySortedState( BinarySortedStateDescriptor<UK, UV> stateProperties);
Proposed Changes
EmbeddedRocksDBStateBackend
The RocksDB state backend is extended to use the state descriptor's LexicographicalTypeSerializer
as the namespace serializer.
A RocksDBBinarySortedNavigator
is added to support the various lookup and range operations. It takes advantage of RocksDB's native iterators.
HashMapStateBackend
A HeapBinarySortedNavigator
is needed to provide the same lookup and range operations for the heap-based state backend. The proposed implementation uses a
HashMap<FK, TreeSet<UKI>>
to keep track of the namespaces that need to be iterated over.
Operators that can potentially be improved by using BinarySortedState
As outlined in the Motivation section, many of Flink's operators can benefit by being updated to leverage this new state type.
However, in a first release, we plan to limit the scope, and only include the new state type.
New Documentation
In addition to javadocs, state#using-keyed-state needs to be updated to include BinarySortedState
.
Compatibility, Deprecation, and Migration Plan
Deprecation
While this FLIP does open the door for eventually deprecating some of the existing (and often abused) mechanisms for customizing DataStream windows, we are not proposing to deprecate anything as part of this FLIP.
Compatibility/Migration
Any operators that are reworked to use binary sorted state will thereafter use state in an incompatible way, compared to their current implementations.
To support upgrades to any SQL operator that is reworked to use binary sorted state, the mechanisms of FLIP-190 for handling operator migrations need to be followed. This involves annotating the ExecNode of the operator with an updated version number, and then modifying the operator so that it works regardless of which version of the state it finds available.
These operators can then either migrate the state, or directly support both the old and new versions of the state. In the context of this FLIP, migrating from any MapState<Long, List<T>>
that is found to BinarySortedState<Long, T>
will be simpler than continuing to maintain the old state as MapState, so that is the preferred approach.
Test Plan
We will include unit tests for the new state types that are tested against both state backends.
We should verify that any operators that are updated to use temporal state have sufficient test coverage. Tests will need to be added for state migration.
Rejected Alternatives
Directly expose the state backend namespaces
By itself, this wouldn’t accomplish much, and the interfaces proposed here satisfy the objectives without exposing this internal API.
Limit the scope to only offering TemporalState
An earlier draft of this FLIP proposed adding TemporalValueState
and TemporalListState
types, roughly equivalent to BinarySortedState<Long, ?>
. After some discussion on the mailing list, this was determined to be too limiting.