Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As described in the FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) we intend to deprecate and remove the DataSet API in the future in favour of the DataStream API. Using the DataStream API for bounded applications should not result in degraded performance. Ideally, we can stay in the same performance ballpark as equivalent DataSet programs.

...

The DataSet API on the other hand, sorts/groups the incoming data by the key and performs aggregation for one key only in memory. Because we only ever emit the results once we do not to keep any intermediate results of the aggregations across different keys.

Technical details

In DataStream API in order to support a state that does not fit into the memory, users must use the RocksDBStateBackend which enables spilling data on disk.

...

ValueStateBenchmark.valueUpdate      SINGLE_KEY      32883.440 ± 1573.605  ops/ms

Proposed Changes

The current behaviour of the DataStream API is equivalent to a hash-based aggregation algorithm where the StateBackends act as the HashTable.

...

The single key at a time execution will be used for the Batch style execution as decided by the algorithm described in FLIP-134: DataStream Semantics for Bounded Input .

StateBackend

We will introduce a new implementation of a StateBackend which will create a KeyedStateBackend that works with a single key at a time. That KeyedStateBackend will create state objects (ValueState, ListState, MapState) that will only keep a single value in the object's field. That way we get rid of a lookup to the StateTable for the current value for the current key. When the StateBackend observes a new incoming key it will reset all acquired state objects so far.

TimerService

We will replace the InternalTimeServiceManager with a customized one which will keep timers only for the current key, and it will fire timers only at the end of each key. The value of InternalTimeService#getCurrentWatermark will be Watermark.MIN_VALUE when processing a key. and will be set to Watermark.MAX_VALUE when firing triggers.

User-visible implications

The proposal does not introduce any new interfaces, though it has some user visible implications.

  1. Users chosen StateBackend will not be used in a bounded style execution, instead we will use the SingleKeyStateBackend
  2. Checkpointing will not work in the bounded execution scenario. We do not keep any state that can be checkpointed.
  3. We do process each key separately, this has to be considered for custom implementations of StreamOperators. Some of the custom StreamOperators might not work in the bounded execution.
    1. the Watermark can jump from the MAX_VALUE (at the end of a key) to MIN_VALUE (at the beginning of the next key). This might break implementations of operators that cache the value of a Watermark and assume it can never move back.
    2. Timers will fire first in the key order and then in timestamp order within the key.
    3. Operators can no longer freely switch processing between different keys. Methods such as e.g. setCurrentKey, applyToAllKeys will not work correctly. Switching the current key to a new value will result in pruning the current state and potentially even firing the timers.

Implementation details

This section contains some lower level details. It contains a few alternatives for certain problems, which we need to decide which we want to implement.

Changed PublicEvolving interfaces

We do need to change some interfaces that are marked as PublicEvolving. However most of the changes apply to classes that even though they are PublicEvolving we do not expect users to really use/implement them.

...

Code Block
@PublicEvolving
public interface StateBackend extends java.io.Serializable {	

.... 

	<K> InternalKeyedStateBackend<K> createKeyedStateBackend(
		Environment env,
		JobID jobID,
		String operatorIdentifier,
		TypeSerializer<K> keySerializer,
		int numberOfKeyGroups,
		KeyGroupRange keyGroupRange,
		TaskKvStateRegistry kvStateRegistry,
		TtlTimeProvider ttlTimeProvider,
		MetricGroup metricGroup,
		@Nonnull Collection<KeyedStateHandle> stateHandles,
		CloseableRegistry cancelStreamRegistry) throws Exception;

}

How to sort/group keys

The end goal of sorting the incoming data is to group the keys together. The DataSet API requires that the key type is Comparable and uses the natural order for sorting. The DataStream API has not imposed this restriction so far on key types. It only requires that the key implements hashCode. Other systems such as e.g. Beam do not use the natural order, but use the binary representation for sorting/grouping. Therefore we can:

  1. Follow the Beam approach and use the binary representation of the key to perform sorting/grouping. This comes with additional requirements. For example, the binary representation of key types has to be deterministic/stable.
  2. Support only keys that we can create a TypeComparator for. (For GenericTypeInfo they must implement Comparable) in batch style execution (fail hard if that's not the case)

Where to apply the sorting

The first question is where in the stack do we want to apply the sorting/grouping of incoming elements. We could identify at least 3 different approaches:

...

  • implementation at a rather low level
  • rather not trivial to pass the configuration, current implementation is hardcoded in the StreamTask, we would have to pass a flag/provider to the task level, whereas in previous approaches it is already embedded into the StreamGraph
  • we would need to extend the DataOutput to cover also the endInput signal

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

  1. Making it configurable whether to use the single key state backend and sorting or just use regular state backends
  2. Go through the whole open-max-watermark-close cycle for an operator for each key, thus "fixing" the zig-zagging problem of watermarks and timers. Come with other problems, though, for example with broadcast input/two-input operators in general.