Status
Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
We propose to add a new type of state that can be used to create simpler and more efficient implementations of the operators that maintain a MapState<?, List<?>>
and/or require range operations. This includes temporal joins, operators that sort their input streams by timestamp, and a few other cases:
...
An implementation is underway. Benchmarking has shown a ~130% increase in throughput for sorting, and ~60% for temporal joins.
Public Interfaces
The complete proposed interface for BinarySortedState<UK, UV>
is shown below. In summary, it includes:
...
Code Block | ||
---|---|---|
| ||
@PublicEvolving <UK, UV> BinarySortedState<UK, UV> getBinarySortedState( BinarySortedStateDescriptor<UK, UV> stateProperties); |
Proposed Changes
EmbeddedRocksDBStateBackend
The RocksDB state backend is extended to use the state descriptor's LexicographicalTypeSerializer
as the namespace serializer.
A RocksDBBinarySortedNavigator
is added to support the various lookup and range operations. It takes advantage of RocksDB's native iterators.
HashMapStateBackend
A HeapBinarySortedNavigator
is needed to provide the same lookup and range operations for the heap-based state backend. The proposed implementation uses a
HashMap<FK, TreeSet<UKI>>
to keep track of the namespaces that need to be iterated over.
Operators that can potentially be improved by using BinarySortedState
As outlined in the Motivation section, many of Flink's operators can benefit by being updated to leverage this new state type.
However, in a first release, we plan to limit the scope, and only include the new state type.
New Documentation
In addition to javadocs, state#using-keyed-state needs to be updated to include BinarySortedState
.
Compatibility, Deprecation, and Migration Plan
Deprecation
While this FLIP does open the door for eventually deprecating some of the existing (and often abused) mechanisms for customizing DataStream windows, we are not proposing to deprecate anything as part of this FLIP.
Compatibility/Migration
Any operators that are reworked to use binary sorted state will thereafter use state in an incompatible way, compared to their current implementations.
...
These operators can then either migrate the state, or directly support both the old and new versions of the state. In the context of this FLIP, migrating from any MapState<Long, List<T>>
that is found to BinarySortedState<Long, T>
will be simpler than continuing to maintain the old state as MapState, so that is the preferred approach.
Test Plan
We will include unit tests for the new state types that are tested against both state backends.
We should verify that any operators that are updated to use temporal state have sufficient test coverage. Tests will need to be added for state migration.
Rejected Alternatives
Directly expose the state backend namespaces
By itself, this wouldn’t accomplish much, and the interfaces proposed here satisfy the objectives without exposing this internal API.
Limit the scope to only offering TemporalState
An earlier draft of this FLIP proposed adding TemporalValueState
and TemporalListState
types, roughly equivalent to BinarySortedState<Long, ?>
. After some discussion on the mailing list, this was determined to be too limiting.
...