Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updated to match the current implementation

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

We propose to add a new type of state that can be used to create simpler and more efficient implementations of the operators that either maintain a MapState<?, List<?>>, or that require range operations. This includes temporal joins, operators that sort their input streams by timestamp, and a few other cases:

...

Public Interfaces

The complete proposed interface for BinarySortedState<UK, UV> is shown below. In summary, it includes:

...

  • clear()


Code Block
languagejava
/**@PublicEvolving
public *interface {@link State} interface for partitioned key-value state. The key-value pair can be added, updated
 * and retrieved.
 *
 * <p>The state is accessed and modified by user functions, and checkpointed consistently by the
 * system as part of the distributed snapshots.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the key of
 * the current element. That way, the system can handle stream and state partitioning consistently
 * together.
 *
 * @param <UK> Type of the keys in the state.
 * @param <UV> Type of the values in the state.
 */
@PublicEvolving
public interface BinarySortedState<UK, UV> extends State {

    /**
     * Returns the current values associated with the given keyBinarySortedState<UK, UV> extends State {

    /**
     * Returns the current values associated with the given key.
     *
     * @param key The key of the mapping
     * @return The values of the mapping with the given key (or <tt>null</tt> if there is none)
     * @throws Exception Thrown if the system cannot access the state.
     */
    @Nullable
    Iterable<UV> valuesAt(UK key) throws Exception;

    /**
     * Returns the current values associated with the first key in state, based on the state's
     * ordering function.
     *
     * @param key The key of the mapping
     * @return The valuesentry of the mapping with the given first key (or <tt>null</tt> if there is none)
     * @throws Exception Thrown if the system cannot access the state.
     */
    @Nullable
    Map.Entry<UK, Iterable<UV>Iterable<UV>> valuesAtfirstEntry(UK key) throws Exception;

    /**
     * Returns the current values associated with the first key in state at or after {@code
     * fromKey}, based on the state's ordering function.
     *
 ordering function.
     *    * @param fromKey The query key (inclusive)
     * @return The entry of the first key after {@code fromKey} (or <tt>null</tt> if there is none)
     * @throws Exception Thrown if the system cannot access the state.
     */
    @Nullable
    Map.Entry<UK, Iterable<UV>> firstEntry(UK fromKey) throws Exception;

    /**
     * Returns the current values associated with the firstlast key in state, atbased oron after {@codethe state's
     * fromKey}, based on the state's ordering function.
     *
     * @param fromKey The query key (inclusive)
     * @return The entry of the firstlast key after {@code fromKey} (or <tt>null</tt> if there is none)
     * @throws Exception Thrown if the system cannot access the state.
     */
    @Nullable
    Map.Entry<UK, Iterable<UV>> firstEntrylastEntry(UK fromKey) throws Exception;

    /**
     * Returns the current values associated with the last key in state, at basedor onbefore the state's{@code toKey},
     * based on the state's ordering function.
     *
     * @param toKey The query key (inclusive)
     * @return The entry of the last key at or before {@code toKey} (or <tt>null</tt> if there is
     *     none)
     * @throws Exception Thrown if the system cannot access the state.
     */
    @Nullable
    Map.Entry<UK, Iterable<UV>> lastEntry(UK toKey) throws Exception;

    /**
     * Associates a Returnsnew theset currentof values associated with the lastgiven key in(overwrites statethe at or before {@code toKey},previous mapping).
     *
 based  on the state's* ordering@param function.
key The key of the *mapping
     * @param toKeyvalues The new values queryof keythe (inclusive)mapping
     * @return@throws TheException entryThrown ofif the lastsystem cannot keyaccess atthe orstate.
 before {@code toKey} (or <tt>null<*/tt>
 if there is
 void put(UK key, Iterable<UV> *values) throws Exception;

   none) /**
     * Adds @throwsa Exceptionnew Thrownvalue ifto the systemmapping cannotwith accessthe thegiven statekey.
     */
    @Nullable
 * @param key Map.Entry<UK, Iterable<UV>> lastEntry(UK toKey) throws Exception;

The key of the mapping
     /**
 @param value The  * Associates a new setvalue ofto valuesadd withto the given key (overwrites the previous mapping).
     *
 @throws Exception Thrown  * @param key The key of the mapping
     * @param values The new values of the mapping
     * @throws Exception Thrown if the if the system cannot access the state.
     */
    void putadd(UK key, Iterable<UV>UV valuesvalue) throws Exception;

    /**
     * AddsReturns aall newthe valuemappings toin the mappingstate withfor the given range keyof keys.
     *
     * @param keyfromKey The first key of the mapping mappings to return (inclusive)
     * @param valuetoKey The last newkey valueof tothe addmappings to the mappingreturn
     * @throws@param ExceptioninclusiveToKey ThrownWhether if{@code thetoKey} systemshould cannotbe accessinclusive theor state.not
     */
 @return An iterable view voidof add(UK key, UV value) throws Exception;

    /**all the state's key-values pairs in the given range.
     * Returns@throws allException theThrown mappingsif inthe thesystem statecannot foraccess the given range of keysstate.
     */
     * @paramIterable<Map.Entry<UK, Iterable<UV>>> readRange(UK fromKey, TheUK firsttoKey, keyboolean ofinclusiveToKey)
 the mappings to return (inclusive)
     * @param toKey The last key of the mappings to returnthrows Exception;

    /**
     * @paramReturns inclusiveToKeyall Whetherthe {@codemappings toKey}in shouldthe bestate inclusivefor orthe not
range of the smallest  * @return An iterable view of all the state's key-values pairs in the given range.key until the given key.
     *
     * @throws@param ExceptiontoKey ThrownThe iflast thekey systemof cannotthe accessmappings theto state.return
     */
 @param inclusiveToKey  Iterable<Map.Entry<UK, Iterable<UV>>> readRange(UK fromKey, UK toKey, boolean inclusiveToKey)Whether {@code toKey} should be inclusive or not
     * @return An iterable view of all  throws Exception;

    /**the state's key-values pairs in the given range.
     * Returns@throws allException theThrown mappings inif the statesystem forcannot access the rangestate.
  of the smallest key*/
 until the given keyIterable<Map.
Entry<UK, Iterable<UV>>> readRangeUntil(UK toKey, boolean *inclusiveToKey)
     *   @param toKey The last key of the mappings to returnthrows Exception;

    /**
     * @paramReturns inclusiveToKeyall Whetherthe {@codemappings toKey}in shouldthe bestate inclusivefor orthe not
range of the given key *(inclusive) @returnuntil Anthe
 iterable view of all the* state'slargest key-values.
 pairs in the given range.*
     * @throws@param ExceptionfromKey ThrownThe iffirst thekey systemof cannotthe accessmappings theto state.return
     */
 @return An iterable view Iterable<Map.Entry<UK, Iterable<UV>>> readRangeUntil(UK toKey, boolean inclusiveToKey)
    of all the state's key-values pairs in the given range.
     * @throws Exception throws Exception;

    /**Thrown if the system cannot access the state.
     * Returns/
 all the mappings in the state for the range of the given key (inclusive) until the
     * largest Iterable<Map.Entry<UK, Iterable<UV>>> readRangeFrom(UK fromKey) throws Exception;

    /**
     * Deletes all values for the mapping with the given key.
     *
     * @param fromKeykey The first key of the mappings to returnmapping
     * @return@throws AnException iterable view of all the state's key-values pairs in the given range.
     * @throws Exception Thrown if the Thrown if the system cannot access the state.
     */
    Iterable<Map.Entry<UK, Iterable<UV>>> readRangeFromvoid clearEntryAt(UK fromKeykey) throws Exception;

    /**
     * Deletes all values forin the mappinggiven withrange theof given keykeys.
     *
     * @param keyfromKey The first key of the mappingmappings to remove (inclusive)
     * @throws@param ExceptiontoKey ThrownThe iflast thekey systemof cannotthe accessmappings theto state.remove
     */
 @param inclusiveToKey Whether void clearEntryAt(UK key) throws Exception;

    /**{@code toKey} should be inclusive or not
     * Deletes@throws allException valuesThrown inif the givensystem cannot rangeaccess ofthe keysstate.
     */
    void * @param fromKey The first key of the mappings to remove (inclusive)clearRange(UK fromKey, UK toKey, boolean inclusiveToKey) throws Exception;

    /**
     * Deletes @paramall toKeyvalues Thein lastthe keyrange of the mappingssmallest tokey remove
until the given key.
     *
     * @param toKey The last key of the mappings to remove
     * @param inclusiveToKey Whether {@code toKey} should be inclusive or not
     * @throws Exception Thrown if the system cannot access the state.
     */
    void clearRangeclearRangeUntil(UK fromKey, UK toKey, boolean inclusiveToKey) throws Exception;

    /**
     * Deletes all values in the range of the smallestgiven key (inclusive) until the givenlargest key.
     *
     * @param toKeyfromKey The lastfirst key of the mappings to remove
     * @param@throws inclusiveToKeyException Whether {@code toKey} should be inclusive or not
     * @throws Exception Thrown Thrown if the system cannot access the state.
     */
    void clearRangeUntilclearRangeFrom(UK toKey, boolean inclusiveToKeyfromKey) throws Exception;

    /**
     * Deletes all values infrom the range of the given key (inclusive) until the largest key. state. */
    void clear();
}


Under this proposal, a new state descriptor type is added:

Code Block
languagejava
@PublicEvolving
public class BinarySortedStateDescriptor<UK, UV> extends ListStateDescriptor<UV> {

     /**
     * @paramCreate fromKeya Thenew first{@code keyBinarySortedStateDescriptor} ofwith the mappingsgiven toname remove
and the given type
  * @throws Exception Thrown* if the system cannot access the state.serializers.
     *
     */
 @param name The voidname clearRangeFrom(UK fromKey) throws Exception;

of the {@code MapStateDescriptor}.
     /** Deletes all values from* @param keySerializer The type serializer for the keys in the state. */
    void clear();
}

Under this proposal, RuntimeContext is extended with methods for registering BinarySortedState.

Code Block
languagejava
public interface RuntimeContext {      

    @PublicEvolving
    <UK, UV> BinarySortedState<UK, UV> getBinarySortedState(
 * @param valueSerializer The type serializer for the values in the state.
     */
    public BinarySortedStateDescriptor(
         BinarySortedStateDescriptor<UK, UV> stateProperties);
}
Code Block
languagejava

public interface KeyedStateStore {

    <T> TemporalValueState<T> getTemporalValueState(ValueStateDescriptor<T> stateProperties  String name,
            LexicographicalTypeSerializer<UK> keySerializer,
            TypeSerializer<UV> valueSerializer);

    <T>  TemporalListState<T> getTemporalListState(ListStateDescriptor<T> stateProperties);
}

Proposed Changes

There are many places in Flink SQL, for example, where something like this is used:

private transient MapState<Long, List<RowData>> dataState;

Under this proposal, this will come

private transient TemporalListState<RowData> dataState;

Having the getAtOrBefore and getAtOrAfter methods available for these temporal state types makes it much easier to implement operations such as temporal joins and temporal sorting. By providing optimized implementations of these methods for each state backend, the simpler implementations of these temporal operations can also be more performant.

EmbeddedRocksDBStateBackend: Add a LexicographicLongSerializer

In order to have an efficient implementation of the getAtOrBefore and getAtOrAfter methods for the RocksDB state backend, we want to leverage the iterators available in RocksDB. 

For this purpose, we have implemented a new LexicographicLongSerializer that serializes the timestamps, which are longs, with an unsigned big-endian encoding so that their binary sort order matches the numerical sort order. 

HashMapStateBackend

Our intended implementation is to lazily create a sorted list of the timestamps for the current key in response to the first call to getAtOrBefore or getAtOrBefore, and then reuse that list until it becomes invalid.

Operators that can potentially be improved by using Temporal State

  • TemporalRowTimeJoinOperator
  • RowTimeSortOperator
  • CepOperator
  • IntervalJoinOperator
  • AbstractStreamArrowPythonOverWindowAggregateFunctionOperator
  • The various over/rank window operators

New Examples

We have implemented two new examples: a temporal join, and a temporal sort.

New Documentation

In addition to javadocs, state#using-keyed-state should be updated to include TemporalValueState and TemporalListState.

Updated Training

docs/learn-flink/event_driven/#example should be updated.

Compatibility, Deprecation, and Migration Plan

Deprecation

While this FLIP does open the door for eventually deprecating some of the existing (and often abused) mechanisms for customizing DataStream windows, we are not proposing to deprecate anything as part of this FLIP.

Compatibility/Migration

Any operators that are reworked to use temporal state will thereafter use state in an incompatible way, compared to their current implementations.

To support upgrades to any SQL operator that is reworked to use temporal state, the mechanisms of FLIP-190 for handling operator migrations need to be followed. This involves annotating the ExecNode of the operator with an updated version number, and then modifying the operator so that it works regardless of which version of the state it finds available. 

These operators can then either migrate the state, or directly support both the old and new versions of the state. In the context of this FLIP, migrating from any MapState<Long, List<T>> that is found to TemporalListState<T> will be simpler than continuing to maintain the old state as MapState, so that is the preferred approach.

Test Plan

We have implemented unit tests for the new state types that are tested against both state backends.

We should verify that any operators that are updated to use temporal state have sufficient test coverage. Tests will need to be added for state migration.

Rejected Alternatives

Directly expose the state backend namespaces

By itself, this wouldn’t accomplish much, and the interfaces proposed here satisfy the objectives without exposing this internal API.

Provide iterators over the per-key timestamps

Many use cases only need to fetch a single value, and don’t need iteration. So while providing users with iterators might offer a slight performance increase in some cases (by keeping low-level/native pointers into the iteration), the added complexity doesn’t seem to be justified.

Add support for TemporalMapState

We haven’t found a motivating use case.

SortedMapState

 /**
     * Create a new {@code BinarySortedStateDescriptor} with the given name and the given type
     * information.
     *
     * @param name The name of the {@code MapStateDescriptor}.
     * @param keySerializer The type serializer for the keys in the state.
     * @param valueTypeInfo The type information for the values in the state.
     */
    public BinarySortedStateDescriptor(
            String name,
            LexicographicalTypeSerializer<UK> keySerializer,
            TypeInformation<UV> valueTypeInfo);

    /**
     * Create a new {@code BinarySortedStateDescriptor} with the given name and the given type
     * information.
     *
     * <p>If this constructor fails (because it is not possible to describe the type via a class),
     * consider using the {@link #BinarySortedStateDescriptor(String, LexicographicalTypeSerializer,
     * TypeInformation)} constructor.
     *
     * @param name The name of the {@code MapStateDescriptor}.
     * @param keySerializer The type serializer for the keys in the state.
     * @param valueType The class of the type of values in the state.
     */
    public BinarySortedStateDescriptor(
            String name, LexicographicalTypeSerializer<UK> keySerializer, Class<UV> valueType);

    /**
     * Gets the serializer for the keys in the state.
     *
     * @return The serializer for the keys in the state.
     */
    public LexicographicalTypeSerializer<UK> getKeySerializer();
}


Here the LexicographicalTypeSerializer is a special TypeSerializer which defines a binary sort order to allow retrieving and iterating values in order. The proposal is to initially provide only an internal implementation of a LexicographicLongSerializer that supports BinarySortedState<Long, T> out of the box:

Code Block
languagejava
private transient BinarySortedState<Long, Long> events;

BinarySortedStateDescriptor<Long, Long> desc =
    new BinarySortedStateDescriptor<>(
        "Events",
        LexicographicLongSerializer.INSTANCE,
        Types.LONG);

events = getRuntimeContext()
    .getBinarySortedState(desc);


As illustrated above, RuntimeContext (and KeyedStateStore) need to be extended with a method for registering BinarySortedState:

Code Block
languagejava
    @PublicEvolving
    <UK, UV> BinarySortedState<UK, UV> getBinarySortedState(
        BinarySortedStateDescriptor<UK, UV> stateProperties);

Proposed Changes

EmbeddedRocksDBStateBackend

The RocksDB state backend is extended to use the state descriptor's LexicographicalTypeSerializer as the namespace serializer.

RocksDBBinarySortedNavigator is added to support the various lookup and range operations. It takes advantage of RocksDB's iterators.

HashMapStateBackend

HeapBinarySortedNavigator is needed to provide the same lookup and range operations. The proposed implementation uses a

HashMap<FK, TreeSet<UKI>> to keep track of the namespaces that need to be iterated over.

Operators that can potentially be improved by using BinarySortedState

As outlined in the Motivation section, many of Flink's operators can benefit by being updated to leverage this new state type. 

However, in a first release, we plan to limit the scope, and only include the new state type.

New Documentation

In addition to javadocs, state#using-keyed-state should be updated to include BinarySortedState.

Compatibility, Deprecation, and Migration Plan

Deprecation

While this FLIP does open the door for eventually deprecating some of the existing (and often abused) mechanisms for customizing DataStream windows, we are not proposing to deprecate anything as part of this FLIP.

Compatibility/Migration

Any operators that are reworked to use binary sorted state will thereafter use state in an incompatible way, compared to their current implementations.

To support upgrades to any SQL operator that is reworked to use binary sorted state, the mechanisms of FLIP-190 for handling operator migrations need to be followed. This involves annotating the ExecNode of the operator with an updated version number, and then modifying the operator so that it works regardless of which version of the state it finds available. 

These operators can then either migrate the state, or directly support both the old and new versions of the state. In the context of this FLIP, migrating from any MapState<Long, List<T>> that is found to BinarySortedState<Long, T> will be simpler than continuing to maintain the old state as MapState, so that is the preferred approach.

Test Plan

We will include unit tests for the new state types that are tested against both state backends.

We should verify that any operators that are updated to use temporal state have sufficient test coverage. Tests will need to be added for state migration.


Rejected Alternatives

Directly expose the state backend namespaces

By itself, this wouldn’t accomplish much, and the interfaces proposed here satisfy the objectives without exposing this internal API.

Limit the scope to only offering TemporalState

An earlier draft of this FLIP proposed adding TemporalValueState and TemporalListState types, roughly equivalent to BinarySortedState<Long, ?>. After some discussion on the mailing list, this was determined to be too limiting. The idea here would be to support MapState where the key type could be any type that implements Comparable. This would be a generalization of TemporalState, which is similar, but limited to only supporting Longs as the keys. This was rejected as being overly difficult to implement in a way that would cleanly leverage RocksDB’s iterators, and we didn’t have a motivating use case that would benefit from this generalization.