Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

We propose to add a new type of state that can be used to create simpler and more efficient implementations of various temporal operations, including temporal joins and sorting.

What these operations have in common is the need to store timestamped records for later retrieval, either indexed by, or relative to, a timestamp. This is something RocksDB can do very efficiently, if we let it.

the operators that either maintain a MapState<?, List<?>>, or that require range operations. This includes temporal joins, operators that sort their input streams by timestamp, and a few other cases:

  • TemporalRowTimeJoinOperator
  • RowTimeSortOperator
  • CepOperator
  • IntervalJoinOperator
  • AbstractStreamArrowPythonOverWindowAggregateFunctionOperator
  • The various over/rank window operators
  • Min/max with retractions
  • Custom window implementations using keyed process functions

Several of these operators More concretely, there are many Flink operators that are using MapState<Long, List<T>> to store lists of records associated with timestamps. Updating these those lists is expensive, especially when they are stored in RocksDB, as that requires deserializing and reserializing the lists when appending new list entries.

With this proposal, this MapState with lists as values becomes TemporalListState<T>, MapState<Long, List<T>> becomes BinarySortedState<Long, T>. We can use the timestamps that were the MapState’s keys as namespaces in the state backends, and the MapState’s values can be stored as lists using each state backend’s native ListState implementation.

This means that for the RocksDB state backend, the implementation of TemporalListState can use the optimized append operation from ListState. This avoids having to deserialize and reserialize the list when adding items to the end of the list, which is a very common (and very expensive) operation.

Another opportunity for optimization is to take advantage of the fact that RocksDB stores its key/value pairs in order, sorted by key. We can use this to reduce the cost of searching for the key closest to a particular timestamp, an operation needed, for example, when computing a temporal joinimplement efficient range scans.

This FLIP will also make makes it possible for users to create custom time windows that are implemented as keyed process functions that achieve performance similar to that of the built-in window operators. This isn’t currently possible because the window operators take advantage of the state backends’ namespaces to optimize state access in a way that can’t be done with Flink’s public APIs. By exposing these namespaces in this limited, controlled way, users will be able to use temporal state to achieve comparable performance. We have verified this with a POC implementation.

An implementation is underway. POC benchmarking has shown ~130% speedup for sorting, and ~60% for temporal joinsWe have also implemented as a POC a revised version of the TemporalRowTimeJoinOperator, and found it to have more than 2x the throughput of the current implementation.

Public Interfaces

Two new keyed state interfaces are introduced – TemporalValueState and TemporalListState – each extending the new TemporalState interface:

Code Block
languagejava
public interface TemporalState {

    /** Removes the value mapped for the specified time under the current key. */
    void clear(long time);

    /** Removes all values for all timestamps under the current key. */
    void clearAll();
}

Both the value and list flavors of temporal state support storing data for a particular timestamp, and retrieving the data either at, at-or-before, or at-or-after a particular timestamp:

Code Block
languagejava
public interface TemporalValueState<T> extends TemporalState {

    T valueAt(long time) throws IOException;

    TemporalValue<T> valueAtOrBefore(long time) throws IOException;

    TemporalValue<T> valueAtOrAfter(long time) throws IOException;

    void update(long time, T value) throws IOException;
}

Code Block
languagejava
public interface TemporalListState<T> extends TemporalState {

    @Nullable
    Iterable<T> getAt(long time) throws Exception;

    @Nullable
    TemporalValue<Iterable<T>> getAtOrBefore(long time) throws Exception;

    @Nullable
    TemporalValue<Iterable<T>> getAtOrAfter(long time) throws Exception;

    void update(long time, List<T> values) throws Exception;

    void addAll(long time, List<T> values) throws Exception;

    void add(long time, T value) throws Exception;
}

The complete interface is shown below. In summary, it includes:

Point operations:

  • valuesAt(key)
  • add(key, value)
  • put(key, values)
  • clearEntryAt(key)

Lookups:

  • firstEntry(), firstEntry(fromKey)
  • lastEntry(), lastEntry(UK toKey)

Range-operations:

  • readRange(fromKey, toKey, inclusiveToKey)
  • readRangeUntil(toKey, inclusiveToKey)
  • readRangeFrom(fromKey)

Range-deletes:

  • clearEntryAt(key)
  • clearRange(fromKey, toKey, inclusiveToKey)
  • clearRangeUntil(toKey, inclusiveToKey)
  • clearRangeFrom(fromKey)

Cleanup:

  • clear()


Code Block
languagejava
/**
 * {@link State} interface for partitioned key-value state. The key-value pair can be added, updated
 * and retrieved.
 *
 * <p>The state is accessed and modified by user functions, and checkpointed consistently by the
 * system as part of the distributed snapshots.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the key of
 * the current element. That way, the system can handle stream and state partitioning consistently
 * together.
 *
 * @param <UK> Type of the keys in the state.
 * @param <UV> Type of the values in the state.
 */
@PublicEvolving
public interface BinarySortedState<UK, UV> extends State {

    /**
     * Returns the current values associated with the given key.
     *
     * @param key The key of the mapping
     * @return The values of the mapping with the given key (or <tt>null</tt> if there is none)
     * @throws Exception Thrown if the system cannot access the state.
     */
    @Nullable
    Iterable<UV> valuesAt(UK key) throws Exception;

    /**
     * Returns the current values associated with the first key in state, based on the state's
     * ordering function.
     *
     * @return The entry of the first key (or <tt>null</tt> if there is none)
     * @throws Exception Thrown if the system cannot access the state.
     */
    @Nullable
    Map.Entry<UK, Iterable<UV>> firstEntry() throws Exception;

    /**
     * Returns the current values associated with the first key in state at or after {@code
     * fromKey}, based on the state's ordering function.
     *
     * @param fromKey The query key (inclusive)
     * @return The entry of the first key after {@code fromKey} (or <tt>null</tt> if there is none)
     * @throws Exception Thrown if the system cannot access the state.
     */
    @Nullable
    Map.Entry<UK, Iterable<UV>> firstEntry(UK fromKey) throws Exception;

    /**
     * Returns the current values associated with the last key in state, based on the state's
     * ordering function.
     *
     * @return The entry of the last key (or <tt>null</tt> if there is none)
     * @throws Exception Thrown if the system cannot access the state.
     */
    @Nullable
    Map.Entry<UK, Iterable<UV>> lastEntry() throws Exception;

    /**
     * Returns the current values associated with the last key in state at or before {@code toKey},
     * based on the state's ordering function.
     *
     * @param toKey The query key (inclusive)
     * @return The entry of the last key at or before {@code toKey} (or <tt>null</tt> if there is
     *     none)
     * @throws Exception Thrown if the system cannot access the state.
     */
    @Nullable
    Map.Entry<UK, Iterable<UV>> lastEntry(UK toKey) throws Exception;

    /**
     * Associates a new set of values with the given key (overwrites the previous mapping).
     *
     * @param key The key of the mapping
     * @param values The new values of the mapping
     * @throws Exception Thrown if the system cannot access the state.
     */
    void put(UK key, Iterable<UV> values) throws Exception;

    /**
     * Adds a new value to the mapping with the given key.
     *
     * @param key The key of the mapping
     * @param value The new value to add to the mapping
     * @throws Exception Thrown if the system cannot access the state.
     */
    void add(UK key, UV value) throws Exception;

    /**
     * Returns all the mappings in the state for the given range of keys.
     *
     * @param fromKey The first key of the mappings to return (inclusive)
     * @param toKey The last key of the mappings to return
     * @param inclusiveToKey Whether {@code toKey} should be inclusive or not
     * @return An iterable view of all the state's key-values pairs in the given range.
     * @throws Exception Thrown if the system cannot access the state.
     */
    Iterable<Map.Entry<UK, Iterable<UV>>> readRange(UK fromKey, UK toKey, boolean inclusiveToKey)
            throws Exception;

    /**
     * Returns all the mappings in the state for the range of the smallest key until the given key.
     *
     * @param toKey The last key of the mappings to return
     * @param inclusiveToKey Whether {@code toKey} should be inclusive or not
     * @return An iterable view of all the state's key-values pairs in the given range.
     * @throws Exception Thrown if the system cannot access the state.
     */
    Iterable<Map.Entry<UK, Iterable<UV>>> readRangeUntil(UK toKey, boolean inclusiveToKey)
            throws Exception;

    /**
     * Returns all the mappings in the state for the range of the given key (inclusive) until the
     * largest key.
     *
     * @param fromKey The first key of the mappings to return
     * @return An iterable view of all the state's key-values pairs in the given range.
     * @throws Exception Thrown if the system cannot access the state.
     */
    Iterable<Map.Entry<UK, Iterable<UV>>> readRangeFrom(UK fromKey) throws Exception;

    /**
     * Deletes all values for the mapping with the given key.
     *
     * @param key The key of the mapping
     * @throws Exception Thrown if the system cannot access the state.
     */
    void clearEntryAt(UK key) throws Exception;

    /**
     * Deletes all values in the given range of keys.
     *
     * @param fromKey The first key of the mappings to remove (inclusive)
     * @param toKey The last key of the mappings to remove
     * @param inclusiveToKey Whether {@code toKey} should be inclusive or not
     * @throws Exception Thrown if the system cannot access the state.
     */
    void clearRange(UK fromKey, UK toKey, boolean inclusiveToKey) throws Exception;

    /**
     * Deletes all values in the range of the smallest key until the given key.
     *
     * @param toKey The last key of the mappings to remove
     * @param inclusiveToKey Whether {@code toKey} should be inclusive or not
     * @throws Exception Thrown if the system cannot access the state.
     */
    void clearRangeUntil(UK toKey, boolean inclusiveToKey) throws Exception;

    /**
     * Deletes all values in the range of the given key (inclusive) until the largest key.
     *
     * @param fromKey The first key of the mappings to remove
     * @throws Exception Thrown if the system cannot access the state.
     */
    void clearRangeFrom(UK fromKey) throws Exception;

    /** Deletes all values from the state. */
    void clear();
}


Under this proposal, RuntimeContext is extended with methods for registering BinarySortedStateUnder this proposal, RuntimeContext and KeyedStateStore are extended with methods for registering TemporalValueState and TemporalListState using the existing ValueStateDescriptor and ListStateDescriptor types.

Code Block
languagejava
public interface RuntimeContext {      

    <T>  TemporalValueState<T> getTemporalValueState(ValueStateDescriptor<T> stateProperties);

    <T> TemporalListState<T> getTemporalListState(ListStateDescriptor<T> @PublicEvolving
    <UK, UV> BinarySortedState<UK, UV> getBinarySortedState(
        BinarySortedStateDescriptor<UK, UV> stateProperties);
}


Code Block
languagejava
public interface KeyedStateStore {

    <T> TemporalValueState<T> getTemporalValueState(ValueStateDescriptor<T> stateProperties);

    <T> TemporalListState<T> getTemporalListState(ListStateDescriptor<T> stateProperties);
}

...