You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

We propose to add a new type of state that can be used to create simpler and more efficient implementations of various temporal operations, including temporal joins and sorting.

What these operations have in common is the need to store timestamped records for later retrieval, either indexed by, or relative to, a timestamp. This is something RocksDB can do very efficiently, if we let it.

More concretely, there are many Flink operators that are using MapState<Long, List<T>> to store lists of records associated with timestamps. Updating these lists is expensive, especially when they are stored in RocksDB.

With this proposal, this MapState with lists as values becomes TemporalListState<T>. We can use the timestamps that were the MapState’s keys as namespaces in the state backends, and the MapState’s values can be stored as lists using each state backend’s native ListState implementation.

This means that for the RocksDB state backend, the implementation of TemporalListState can use the optimized append operation from ListState. This avoids having to deserialize and reserialize the list when adding items to the end of the list, which is a very common (and very expensive) operation.

Another opportunity for optimization is to take advantage of the fact that RocksDB stores its key/value pairs in order, sorted by key. We can use this to reduce the cost of searching for the key closest to a particular timestamp, an operation needed, for example, when computing a temporal join.

This FLIP will also make it possible for users to create custom time windows that are implemented as process functions that achieve performance similar to that of the built-in window operators. This isn’t currently possible because the window operators take advantage of the state backends’ namespaces to optimize state access in a way that can’t be done with Flink’s public APIs. By exposing these namespaces in this limited, controlled way, users will be able to use temporal state to achieve comparable performance. We have verified this with a POC implementation.

We have also implemented as a POC a revised version of the TemporalRowTimeJoinOperator, and found it to have more than 2x the throughput of the current implementation.

Public Interfaces

Two new keyed state interfaces are introduced – TemporalValueState and TemporalListState – each extending the new TemporalState interface:

public interface TemporalState {

    /** Removes the value mapped for the specified time under the current key. */
    void clear(long time);

    /** Removes all values for all timestamps under the current key. */
    void clearAll();
}



Both the value and list flavors of temporal state support storing data for a particular timestamp, and retrieving the data either at, at-or-before, or at-or-after a particular timestamp:

public interface TemporalValueState<T> extends TemporalState {

    T valueAt(long time) throws IOException;

    TemporalValue<T> valueAtOrBefore(long time) throws IOException;

    TemporalValue<T> valueAtOrAfter(long time) throws IOException;

    void update(long time, T value) throws IOException;
}


public interface TemporalListState<T> extends TemporalState {

    @Nullable
    Iterable<T> getAt(long time) throws Exception;

    @Nullable
    TemporalValue<Iterable<T>> getAtOrBefore(long time) throws Exception;

    @Nullable
    TemporalValue<Iterable<T>> getAtOrAfter(long time) throws Exception;

    void update(long time, List<T> values) throws Exception;

    void addAll(long time, List<T> values) throws Exception;

    void add(long time, T value) throws Exception;
}



Under this proposal, RuntimeContext and KeyedStateStore are extended with methods for registering TemporalValueState and TemporalListState using the existing ValueStateDescriptor and ListStateDescriptor types.

public interface RuntimeContext {

    <T> TemporalValueState<T> getTemporalValueState(ValueStateDescriptor<T> stateProperties);

    <T> TemporalListState<T> getTemporalListState(ListStateDescriptor<T> stateProperties);
}




public interface KeyedStateStore {

    <T> TemporalValueState<T> getTemporalValueState(ValueStateDescriptor<T> stateProperties);

    <T> TemporalListState<T> getTemporalListState(ListStateDescriptor<T> stateProperties);
}

Proposed Changes

There are many places in Flink SQL, for example, where something like this is used:

private transient MapState<Long, List<RowData>> dataState;

Under this proposal, this will come

private transient TemporalListState<RowData> dataState;

Having the getAtOrBefore and getAtOrAfter methods available for these temporal state types makes it much easier to implement operations such as temporal joins and temporal sorting. By providing optimized implementations of these methods for each state backend, the simpler implementations of these temporal operations can also be more performant.

EmbeddedRocksDBStateBackend: Add a LexicographicLongSerializer

In order to have an efficient implementation of the getAtOrBefore and getAtOrAfter methods for the RocksDB state backend, we want to leverage the iterators available in RocksDB. 

For this purpose, we have implemented a new LexicographicLongSerializer that serializes the timestamps, which are longs, with an unsigned big-endian encoding so that their binary sort order matches the numerical sort order. 

HashMapStateBackend

Our intended implementation is to lazily create a sorted list of the timestamps for the current key in response to the first call to getAtOrBefore or getAtOrBefore, and then reuse that list until it becomes invalid.

Operators that can potentially be improved by using Temporal State

  • TemporalRowTimeJoinOperator
  • RowTimeSortOperator
  • CepOperator
  • IntervalJoinOperator
  • AbstractStreamArrowPythonOverWindowAggregateFunctionOperator
  • The various over/rank window operators

New Examples

We have implemented two new examples: a temporal join, and a temporal sort.

New Documentation

In addition to javadocs, state#using-keyed-state should be updated to include TemporalValueState and TemporalListState.

Updated Training

docs/learn-flink/event_driven/#example should be updated.

Compatibility, Deprecation, and Migration Plan

Deprecation

While this FLIP does open the door for eventually deprecating some of the existing (and often abused) mechanisms for customizing DataStream windows, we are not proposing to deprecate anything as part of this FLIP.

Compatibility/Migration

Any operators that are reworked to use temporal state will thereafter use state in an incompatible way, compared to their current implementations.

To support upgrades to any SQL operator that is reworked to use temporal state, the mechanisms of FLIP-190 for handling operator migrations need to be followed. This involves annotating the ExecNode of the operator with an updated version number, and then modifying the operator so that it works regardless of which version of the state it finds available. 

These operators can then either migrate the state, or directly support both the old and new versions of the state. In the context of this FLIP, migrating from any MapState<Long, List<T>> that is found to TemporalListState<T> will be simpler than continuing to maintain the old state as MapState, so that is the preferred approach.

Test Plan

We have implemented unit tests for the new state types that are tested against both state backends.

We should verify that any operators that are updated to use temporal state have sufficient test coverage. Tests will need to be added for state migration.

Rejected Alternatives

Directly expose the state backend namespaces

By itself, this wouldn’t accomplish much, and the interfaces proposed here satisfy the objectives without exposing this internal API.

Provide iterators over the per-key timestamps

Many use cases only need to fetch a single value, and don’t need iteration. So while providing users with iterators might offer a slight performance increase in some cases (by keeping low-level/native pointers into the iteration), the added complexity doesn’t seem to be justified.

Add support for TemporalMapState

We haven’t found a motivating use case.

SortedMapState

The idea here would be to support MapState where the key type could be any type that implements Comparable. This would be a generalization of TemporalState, which is similar, but limited to only supporting Longs as the keys. This was rejected as being overly difficult to implement in a way that would cleanly leverage RocksDB’s iterators, and we didn’t have a motivating use case that would benefit from this generalization.

  • No labels