Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: updating title

Status

Page properties


Discussion threadhttps://lists.apache.org/thread/lbdpr1cjt9ddrotowmzzk4lv787b3t5z
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-27176

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

We propose to add a new type of state that can be used to create simpler and more efficient implementations of the operators that maintain a MapState<?, List<?>> and/or require range operations. This includes temporal joins, operators that sort their input streams by timestamp, and a few other cases:

...

An implementation is underway. Benchmarking has shown a ~130% increase in throughput for sorting, and ~60% for temporal joins.

Public Interfaces


The complete proposed interface for BinarySortedState<UK, UV> is shown below. In summary, it includes:

...

Code Block
languagejava
    @PublicEvolving
    <UK, UV> BinarySortedState<UK, UV> getBinarySortedState(
        BinarySortedStateDescriptor<UK, UV> stateProperties);

Proposed Changes

EmbeddedRocksDBStateBackend

The RocksDB state backend is extended to use the state descriptor's LexicographicalTypeSerializer as the namespace serializer.

RocksDBBinarySortedNavigator is added to support the various lookup and range operations. It takes advantage of RocksDB's native iterators.

HashMapStateBackend

HeapBinarySortedNavigator is needed to provide the same lookup and range operations for the heap-based state backend. The proposed implementation uses a

HashMap<FK, TreeSet<UKI>> to keep track of the namespaces that need to be iterated over.

Operators that can potentially be improved by using BinarySortedState

As outlined in the Motivation section, many of Flink's operators can benefit by being updated to leverage this new state type. 

However, in a first release, we plan to limit the scope, and only include the new state type.

New Documentation

In addition to javadocs, state#using-keyed-state needs to be updated to include BinarySortedState.

Compatibility, Deprecation, and Migration Plan

Deprecation

While this FLIP does open the door for eventually deprecating some of the existing (and often abused) mechanisms for customizing DataStream windows, we are not proposing to deprecate anything as part of this FLIP.

Compatibility/Migration

Any operators that are reworked to use binary sorted state will thereafter use state in an incompatible way, compared to their current implementations.

...

These operators can then either migrate the state, or directly support both the old and new versions of the state. In the context of this FLIP, migrating from any MapState<Long, List<T>> that is found to BinarySortedState<Long, T> will be simpler than continuing to maintain the old state as MapState, so that is the preferred approach.

Test Plan

We will include unit tests for the new state types that are tested against both state backends.

We should verify that any operators that are updated to use temporal state have sufficient test coverage. Tests will need to be added for state migration.


Rejected Alternatives

Directly expose the state backend namespaces

By itself, this wouldn’t accomplish much, and the interfaces proposed here satisfy the objectives without exposing this internal API.

Limit the scope to only offering TemporalState

An earlier draft of this FLIP proposed adding TemporalValueState and TemporalListState types, roughly equivalent to BinarySortedState<Long, ?>. After some discussion on the mailing list, this was determined to be too limiting. 

...