...
The single key at a time execution will be used for the Batch style execution as decided by the algorithm described in FLIP-134: DataStream Semantics for Bounded Input .
Moreover it will be possible to disable it through a execution.sorted-shuffles.enabled
configuration option.
StateBackend
We will introduce a new implementation of a StateBackend which will create a KeyedStateBackend that works with a single key at a time. That KeyedStateBackend will create state objects (ValueState, ListState, MapState) that will only keep a single value in the object's field. That way we get rid of a lookup to the StateTable for the current value for the current key. When the StateBackend observes a new incoming key it will reset all acquired state objects so far.
...
- This change should not affect current users of the DataStream API, it will be enabled based on the setting discussed in FLIP-134: DataStream Semantics for Bounded Input .
Rejected Alternatives
- Making it configurable whether to use the single key state backend and sorting or just use regular state backends
- Go through the whole open-max-watermark-close cycle for an operator for each key, thus "fixing" the zig-zagging problem of watermarks and timers. Come with other problems, though, for example with broadcast input/two-input operators in general.