Status

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As described in the FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) we intend to deprecate and remove the DataSet API in the future in favour of the DataStream API. Using the DataStream API for bounded applications should not result in degraded performance. Ideally, we can stay in the same performance ballpark as equivalent DataSet programs.

The DataStream API relies on StateBackends for performing reduce/aggregate-like operations. In streaming scenarios, there is no single point in time where we can perform the calculations and emit results downstream. The calculations are performed in an incremental way and we need to keep all the intermediate results at hand. If the state does not fit into memory users are forced to use RocksDB, which comes with the penalty of serializing records, often spilling to disk, costly metadata bookkeeping etc. This is not necessarily efficient for the bounded style execution where we have a single point in time after which the results will not change.

The DataSet API on the other hand, sorts/groups the incoming data by the key and performs aggregation for one key only in memory. Because we only ever emit the results once we do not to keep any intermediate results of the aggregations across different keys.

Technical details

In DataStream API in order to support a state that does not fit into the memory, users must use the RocksDBStateBackend which enables spilling data on disk.

We performed some benchmarks to compare results of the different execution styles and using different state backends. The program we performed the tests for is a version of a WordCount, which in case of the DataStream API is a WordCount in a global window. The job counts 40_000_000 incoming records with 4_000_000 different keys. An interesting observation is that the performance of HeapStateBackend depends heavily on the number of keys. With a high number of keys it suffers a significant penalty and becomes even less performant for that particular case than the sorting approach. The results are as follows:


key type

time (avg from 15 runs)

RocksDB

integer

114884ms

string

116894ms

HeapStateBackend

integer

30126ms

string

37356ms

DataSet (with no combiner)

integer

13810ms

string

23032ms

Moreover we wrote a PoC which changes the DataStream API to use the DataSet API approach by sorting the data and using a specialized StateBackend which only ever keeps state for a single key.


key type

time (avg from 15 runs)

DataStream with input sorting

integer

15410ms

string

25007ms

The results from the PoC show that we can achieve similar performance of both the DataStream and DataSet APIs.

Additionally we compared the performance of isolated ValueStates of different StateBackends (using the https://github.com/apache/flink-benchmarks/blob/master/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java without shuffling the keys) we got:

Benchmark                                                     (backendType)  Score               Error   Units

ValueStateBenchmark.valueAdd              HEAP                  9206.249 ±  451.233    ops/ms

ValueStateBenchmark.valueAdd              ROCKSDB         652.726 ±   36.900        ops/ms

ValueStateBenchmark.valueAdd              SINGLE_KEY    18282.074 ±  936.477  ops/ms

ValueStateBenchmark.valueGet              HEAP                  12718.518 ±  241.353  ops/ms

ValueStateBenchmark.valueGet              ROCKSDB          1621.098 ±   86.234     ops/ms

ValueStateBenchmark.valueGet              SINGLE_KEY     30861.293 ±  500.203  ops/ms

ValueStateBenchmark.valueUpdate      HEAP                   12916.056 ±  491.163  ops/ms

ValueStateBenchmark.valueUpdate      ROCKSDB           941.824 ±   19.172        ops/ms

ValueStateBenchmark.valueUpdate      SINGLE_KEY      32883.440 ± 1573.605  ops/ms

Proposed Changes

The current behaviour of the DataStream API is equivalent to a hash-based aggregation algorithm where the StateBackends act as the HashTable.

We suggest to leverage the two characteristics of batch execution mode:

  • we do not have checkpoints
  • we have bounded data

and replace the hash-based aggregations with sort-based aggregations. Assuming that we can fit accumulators for a single key in memory, we can discard the need for the RocksDB in that case.

We do want this change to be mostly transparent to the user code. We will introduce a sorting step (with potential spilling, reusing the UnilateralSortMerger implementation) before every keyed operator for sorting/grouping inputs by their keys. This will allow us to process records in per-key groups, which will enable us to use a simplified implementation of a StateBackend that is not organized in key groups and only ever keeps values for a single key.

The single key at a time execution will be used for the Batch style execution as decided by the algorithm described in FLIP-134: DataStream Semantics for Bounded Input .

Moreover it will be possible to disable it through a execution.sorted-shuffles.enabled configuration option.

StateBackend

We will introduce a new implementation of a StateBackend which will create a KeyedStateBackend that works with a single key at a time. That KeyedStateBackend will create state objects (ValueState, ListState, MapState) that will only keep a single value in the object's field, i.e. ValueState is a very thin wrapper for a value, MapState for a HashMap,
ListState for a List etc.  That way we get rid of a lookup to the StateTable for the current value for the current key. When the StateBackend observes a new incoming key it will reset all acquired state objects so far, setting the wrapped values to null. We will know that the key changed/finished because we will keep the current key and in the setCurrentKey we check if the new key is different then the current one.

TimerService

We will replace the InternalTimeServiceManager with a customized one which will keep timers only for the current key, and it will fire timers only at the end of each key. The value of InternalTimeService#getCurrentWatermark will be Watermark.MIN_VALUE when processing a key. and will be set to Watermark.MAX_VALUE when firing triggers.

User-visible implications

The proposal does not introduce any new interfaces, though it has some user visible implications.

  1. Users chosen StateBackend will not be used in a bounded style execution, instead we will use the SingleKeyStateBackend
  2. Checkpointing will not work in the bounded execution scenario. We do not keep any state that can be checkpointed.
  3. We do process each key separately, this has to be considered for custom implementations of StreamOperators. Some of the custom StreamOperators might not work in the bounded execution.
    1. the Watermark can jump from the MAX_VALUE (at the end of a key) to MIN_VALUE (at the beginning of the next key). This might break implementations of operators that cache the value of a Watermark and assume it can never move back.
    2. Timers will fire first in the key order and then in timestamp order within the key.
    3. Operators can no longer freely switch processing between different keys. Methods such as e.g. setCurrentKey, applyToAllKeys will not work correctly. Switching the current key to a new value will result in pruning the current state and potentially even firing the timers.

Implementation details

This section contains some lower level details. It contains a few alternatives for certain problems, which we need to decide which we want to implement.

Changed PublicEvolving interfaces

We do need to change some interfaces that are marked as PublicEvolving. However most of the changes apply to classes that even though they are PublicEvolving we do not expect users to really use/implement them.

In order to be able to create that KeyedStateBackend without the overhead of the AbstractKeyedStateBackend we suggest extracting an interface CheckpointableKeyedStateBackend  similarly to the current OperatorStateStore .

/**
 * An interface that extends user facing {@link KeyedStateBackend} with internal system specific features.
 */
public interface CheckpointableKeyedStateBackend<K> extends
		KeyedStateBackend<K>,
		SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,
		Closeable,
		CheckpointListener {

	KeyGroupRange getKeyGroupRange();
}

Then the StateBackend  class would create this new interface instead of the AbstractKeyedStateBackend :

@PublicEvolving
public interface StateBackend extends java.io.Serializable {	

.... 

	<K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
		Environment env,
		JobID jobID,
		String operatorIdentifier,
		TypeSerializer<K> keySerializer,
		int numberOfKeyGroups,
		KeyGroupRange keyGroupRange,
		TaskKvStateRegistry kvStateRegistry,
		TtlTimeProvider ttlTimeProvider,
		MetricGroup metricGroup,
		@Nonnull Collection<KeyedStateHandle> stateHandles,
		CloseableRegistry cancelStreamRegistry) throws Exception;

}

How to sort/group keys

The end goal of sorting the incoming data is to group the keys together. The DataSet API requires that the key type is Comparable and uses the natural order for sorting. The DataStream API has not imposed this restriction so far on key types. It only requires that the key implements hashCode. Other systems such as e.g. Beam do not use the natural order, but use the binary representation for sorting/grouping. Therefore we can:

  • Follow the Beam approach and use the binary representation of the key to perform sorting/grouping. This comes with additional requirements. For example, the binary representation of key types has to be deterministic/stable.

REJECTED ALTERNATIVE:

  • Support only keys that we can create a TypeComparator for. (For GenericTypeInfo they must implement Comparable) in batch style execution (fail hard if that's not the case)

Where to apply the sorting

The first question is where in the stack do we want to apply the sorting/grouping of incoming elements. We could identify at least 3 different approaches:

PREFERRED option in the discussion:

1. Built-in sorting into org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput

Pros:

  • involves the least virtual calls
  • no topology changes

Cons:

  • implementation at a rather low level
  • rather not trivial to pass the configuration, current implementation is hardcoded in the StreamTask, we would have to pass a flag/provider to the task level, whereas in previous approaches it is already embedded into the StreamGraph
  • we would need to extend the DataOutput to cover also the endInput signal

REJECTED alternatives:

  1.  Create a wrapper operator

    Pros: 

  • simple changes in the StreamGraphGenerator, just wrap the original operator
  • no changes in the topology
  • simple methods forwarding

    Cons:

  • not the most modular approach
  • need to wrap all methods

2. Create standalone sorting operators

Pros:

  • the most modular approach
  • theoretically can run the sorting in separate slots from the actual operator

Cons:

  • changes to the topology
  • not trivial connection of a two-input operator (via OutputTags)
  • in case of the two input operator at least the merging step must be chained with the two input operator

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

  1. Making it configurable whether to use the single key state backend and sorting or just use regular state backends
  2. Go through the whole open-max-watermark-close cycle for an operator for each key, thus "fixing" the zig-zagging problem of watermarks and timers. Come with other problems, though, for example with broadcast input/two-input operators in general.