Status
Current state: Released
Discussion thread: https://lists.apache.org/thread.html/rf88c0ceddf419ba14760a47ae87945d33387abbd03ccbea5f35b8ed4%40%3Cdev.flink.apache.org%3E
JIRA:
Released: Flink 1.12
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
As described in the FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) we intend to deprecate and remove the DataSet API in the future in favour of the DataStream API. Using the DataStream API for bounded applications should not result in degraded performance. Ideally, we can stay in the same performance ballpark as equivalent DataSet programs.
The DataStream API relies on StateBackends for performing reduce/aggregate-like operations. In streaming scenarios, there is no single point in time where we can perform the calculations and emit results downstream. The calculations are performed in an incremental way and we need to keep all the intermediate results at hand. If the state does not fit into memory users are forced to use RocksDB, which comes with the penalty of serializing records, often spilling to disk, costly metadata bookkeeping etc. This is not necessarily efficient for the bounded style execution where we have a single point in time after which the results will not change.
The DataSet API on the other hand, sorts/groups the incoming data by the key and performs aggregation for one key only in memory. Because we only ever emit the results once we do not to keep any intermediate results of the aggregations across different keys.
Technical details
In DataStream API in order to support a state that does not fit into the memory, users must use the RocksDBStateBackend which enables spilling data on disk.
We performed some benchmarks to compare results of the different execution styles and using different state backends. The program we performed the tests for is a version of a WordCount, which in case of the DataStream API is a WordCount in a global window. The job counts 40_000_000 incoming records with 4_000_000 different keys. An interesting observation is that the performance of HeapStateBackend depends heavily on the number of keys. With a high number of keys it suffers a significant penalty and becomes even less performant for that particular case than the sorting approach. The results are as follows:
key type | time (avg from 15 runs) | |
RocksDB | integer | 114884ms |
string | 116894ms | |
HeapStateBackend | integer | 30126ms |
string | 37356ms | |
DataSet (with no combiner) | integer | 13810ms |
string | 23032ms |
Moreover we wrote a PoC which changes the DataStream API to use the DataSet API approach by sorting the data and using a specialized StateBackend which only ever keeps state for a single key.
key type | time (avg from 15 runs) | |
DataStream with input sorting | integer | 15410ms |
string | 25007ms |
The results from the PoC show that we can achieve similar performance of both the DataStream and DataSet APIs.
Additionally we compared the performance of isolated ValueStates of different StateBackends (using the https://github.com/apache/flink-benchmarks/blob/master/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java without shuffling the keys) we got:
Benchmark (backendType) Score Error Units
ValueStateBenchmark.valueAdd HEAP 9206.249 ± 451.233 ops/ms
ValueStateBenchmark.valueAdd ROCKSDB 652.726 ± 36.900 ops/ms
ValueStateBenchmark.valueAdd SINGLE_KEY 18282.074 ± 936.477 ops/ms
ValueStateBenchmark.valueGet HEAP 12718.518 ± 241.353 ops/ms
ValueStateBenchmark.valueGet ROCKSDB 1621.098 ± 86.234 ops/ms
ValueStateBenchmark.valueGet SINGLE_KEY 30861.293 ± 500.203 ops/ms
ValueStateBenchmark.valueUpdate HEAP 12916.056 ± 491.163 ops/ms
ValueStateBenchmark.valueUpdate ROCKSDB 941.824 ± 19.172 ops/ms
ValueStateBenchmark.valueUpdate SINGLE_KEY 32883.440 ± 1573.605 ops/ms
Proposed Changes
The current behaviour of the DataStream API is equivalent to a hash-based aggregation algorithm where the StateBackends act as the HashTable.
We suggest to leverage the two characteristics of batch execution mode:
- we do not have checkpoints
- we have bounded data
and replace the hash-based aggregations with sort-based aggregations. Assuming that we can fit accumulators for a single key in memory, we can discard the need for the RocksDB in that case.
We do want this change to be mostly transparent to the user code. We will introduce a sorting step (with potential spilling, reusing the UnilateralSortMerger implementation) before every keyed operator for sorting/grouping inputs by their keys. This will allow us to process records in per-key groups, which will enable us to use a simplified implementation of a StateBackend that is not organized in key groups and only ever keeps values for a single key.
The single key at a time execution will be used for the Batch style execution as decided by the algorithm described in FLIP-134: DataStream Semantics for Bounded Input .
Moreover it will be possible to disable it through a execution.sorted-shuffles.enabled
configuration option.
StateBackend
We will introduce a new implementation of a StateBackend which will create a KeyedStateBackend that works with a single key at a time. That KeyedStateBackend will create state objects (ValueState, ListState, MapState) that will only keep a single value in the object's field, i.e. ValueState is a very thin wrapper for a value, MapState for a HashMap,
ListState for a List etc. That way we get rid of a lookup to the StateTable for the current value for the current key. When the StateBackend observes a new incoming key it will reset all acquired state objects so far, setting the wrapped values to null. We will know that the key changed/finished because we will keep the current key and in the setCurrentKey we check if the new key is different then the current one.
TimerService
We will replace the InternalTimeServiceManager with a customized one which will keep timers only for the current key, and it will fire timers only at the end of each key. The value of InternalTimeService#getCurrentWatermark
will be Watermark.MIN_VALUE
when processing a key. and will be set to Watermark.MAX_VALUE
when firing triggers.
User-visible implications
The proposal does not introduce any new interfaces, though it has some user visible implications.
- Users chosen StateBackend will not be used in a bounded style execution, instead we will use the SingleKeyStateBackend
- Checkpointing will not work in the bounded execution scenario. We do not keep any state that can be checkpointed.
- We do process each key separately, this has to be considered for custom implementations of StreamOperators. Some of the custom StreamOperators might not work in the bounded execution.
- the Watermark can jump from the MAX_VALUE (at the end of a key) to MIN_VALUE (at the beginning of the next key). This might break implementations of operators that cache the value of a Watermark and assume it can never move back.
- Timers will fire first in the key order and then in timestamp order within the key.
- Operators can no longer freely switch processing between different keys. Methods such as e.g. setCurrentKey, applyToAllKeys will not work correctly. Switching the current key to a new value will result in pruning the current state and potentially even firing the timers.
Implementation details
This section contains some lower level details. It contains a few alternatives for certain problems, which we need to decide which we want to implement.
Changed PublicEvolving interfaces
We do need to change some interfaces that are marked as PublicEvolving. However most of the changes apply to classes that even though they are PublicEvolving we do not expect users to really use/implement them.
In order to be able to create that KeyedStateBackend without the overhead of the AbstractKeyedStateBackend we suggest extracting an interface CheckpointableKeyedStateBackend similarly to the current OperatorStateStore
.
/** * An interface that extends user facing {@link KeyedStateBackend} with internal system specific features. */ public interface CheckpointableKeyedStateBackend<K> extends KeyedStateBackend<K>, SnapshotStrategy<SnapshotResult<KeyedStateHandle>>, Closeable, CheckpointListener { KeyGroupRange getKeyGroupRange(); }
Then the StateBackend
class would create this new interface instead of the AbstractKeyedStateBackend
:
@PublicEvolving public interface StateBackend extends java.io.Serializable { .... <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception; }
How to sort/group keys
The end goal of sorting the incoming data is to group the keys together. The DataSet API requires that the key type is Comparable and uses the natural order for sorting. The DataStream API has not imposed this restriction so far on key types. It only requires that the key implements hashCode
. Other systems such as e.g. Beam do not use the natural order, but use the binary representation for sorting/grouping. Therefore we can:
- Follow the Beam approach and use the binary representation of the key to perform sorting/grouping. This comes with additional requirements. For example, the binary representation of key types has to be deterministic/stable.
REJECTED ALTERNATIVE:
- Support only keys that we can create a TypeComparator for. (For GenericTypeInfo they must implement Comparable) in batch style execution (fail hard if that's not the case)
Where to apply the sorting
The first question is where in the stack do we want to apply the sorting/grouping of incoming elements. We could identify at least 3 different approaches:
PREFERRED option in the discussion:
1. Built-in sorting into org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
Pros:
- involves the least virtual calls
- no topology changes
Cons:
- implementation at a rather low level
- rather not trivial to pass the configuration, current implementation is hardcoded in the StreamTask, we would have to pass a flag/provider to the task level, whereas in previous approaches it is already embedded into the StreamGraph
- we would need to extend the DataOutput to cover also the endInput signal
REJECTED alternatives:
- Create a wrapper operator
Pros:
- simple changes in the StreamGraphGenerator, just wrap the original operator
- no changes in the topology
- simple methods forwarding
Cons:
- not the most modular approach
- need to wrap all methods
2. Create standalone sorting operators
Pros:
- the most modular approach
- theoretically can run the sorting in separate slots from the actual operator
Cons:
- changes to the topology
- not trivial connection of a two-input operator (via OutputTags)
- in case of the two input operator at least the merging step must be chained with the two input operator
Compatibility, Deprecation, and Migration Plan
- This change should not affect current users of the DataStream API, it will be enabled based on the setting discussed in FLIP-134: DataStream Semantics for Bounded Input .
Rejected Alternatives
- Making it configurable whether to use the single key state backend and sorting or just use regular state backends
- Go through the whole open-max-watermark-close cycle for an operator for each key, thus "fixing" the zig-zagging problem of watermarks and timers. Come with other problems, though, for example with broadcast input/two-input operators in general.