Status
Page properties |
---|
This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Status
...
...
...
|
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Table of Contents |
---|
Motivation
As described in the FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) we intend to deprecate and remove the DataSet API in the future in favour of the DataStream API. Using the DataStream API for bounded applications should not result in degraded performance. Ideally, we can stay in the same performance ballpark as equivalent DataSet programs.
...
The DataSet API on the other hand, sorts/groups the incoming data by the key and performs aggregation for one key only in memory. Because we only ever emit the results once we do not to keep any intermediate results of the aggregations across different keys.
Technical details
In DataStream API in order to support a state that does not fit into the memory, users must use the RocksDBStateBackend which enables spilling data on disk.
...
ValueStateBenchmark.valueUpdate SINGLE_KEY 32883.440 ± 1573.605 ops/ms
Proposed Changes
The current behaviour of the DataStream API is equivalent to a hash-based aggregation algorithm where the StateBackends act as the HashTable.
...
Moreover it will be possible to disable it through a execution.sorted-shuffles.enabled
configuration option.
StateBackend
We will introduce a new implementation of a StateBackend which will create a KeyedStateBackend that works with a single key at a time. That KeyedStateBackend will create state objects (ValueState, ListState, MapState) that will only keep a single value in the object's field, i.e. ValueState is a very thin wrapper for a value, MapState for a HashMap,
ListState for a List etc. That way we get rid of a lookup to the StateTable for the current value for the current key. When the StateBackend observes a new incoming key it will reset all acquired state objects so far, setting the wrapped values to null. We will know that the key changed/finished because we will keep the current key and in the setCurrentKey we check if the new key is different then the current one.
TimerService
We will replace the InternalTimeServiceManager with a customized one which will keep timers only for the current key, and it will fire timers only at the end of each key. The value of InternalTimeService#getCurrentWatermark
will be Watermark.MIN_VALUE
when processing a key. and will be set to Watermark.MAX_VALUE
when firing triggers.
User-visible implications
The proposal does not introduce any new interfaces, though it has some user visible implications.
- Users chosen StateBackend will not be used in a bounded style execution, instead we will use the SingleKeyStateBackend
- Checkpointing will not work in the bounded execution scenario. We do not keep any state that can be checkpointed.
- We do process each key separately, this has to be considered for custom implementations of StreamOperators. Some of the custom StreamOperators might not work in the bounded execution.
- the Watermark can jump from the MAX_VALUE (at the end of a key) to MIN_VALUE (at the beginning of the next key). This might break implementations of operators that cache the value of a Watermark and assume it can never move back.
- Timers will fire first in the key order and then in timestamp order within the key.
- Operators can no longer freely switch processing between different keys. Methods such as e.g. setCurrentKey, applyToAllKeys will not work correctly. Switching the current key to a new value will result in pruning the current state and potentially even firing the timers.
Implementation details
This section contains some lower level details. It contains a few alternatives for certain problems, which we need to decide which we want to implement.
Changed PublicEvolving interfaces
We do need to change some interfaces that are marked as PublicEvolving. However most of the changes apply to classes that even though they are PublicEvolving we do not expect users to really use/implement them.
...
Code Block |
---|
@PublicEvolving public interface StateBackend extends java.io.Serializable { .... <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception; } |
How to sort/group keys
The end goal of sorting the incoming data is to group the keys together. The DataSet API requires that the key type is Comparable and uses the natural order for sorting. The DataStream API has not imposed this restriction so far on key types. It only requires that the key implements hashCode
. Other systems such as e.g. Beam do not use the natural order, but use the binary representation for sorting/grouping. Therefore we can:
...
- Support only keys that we can create a TypeComparator for. (For GenericTypeInfo they must implement Comparable) in batch style execution (fail hard if that's not the case)
Where to apply the sorting
The first question is where in the stack do we want to apply the sorting/grouping of incoming elements. We could identify at least 3 different approaches:
...
- changes to the topology
- not trivial connection of a two-input operator (via OutputTags)
- in case of the two input operator at least the merging step must be chained with the two input operator
Compatibility, Deprecation, and Migration Plan
- This change should not affect current users of the DataStream API, it will be enabled based on the setting discussed in FLIP-134: DataStream Semantics for Bounded Input .
Rejected Alternatives
- Making it configurable whether to use the single key state backend and sorting or just use regular state backends
- Go through the whole open-max-watermark-close cycle for an operator for each key, thus "fixing" the zig-zagging problem of watermarks and timers. Come with other problems, though, for example with broadcast input/two-input operators in general.