Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The DataStream API relies on StateBackends for performing reduce/aggregate-like operations. In streaming scenarios there is no single point in time where we can perform the calculations and emit results downstream. The calculations are performed in an incremental way and we need to keep all the intermediate results at hand. If the state does not fit into memory users are forced to use RocksDB, which comes with the penalty of serializing records, often spilling to disk, costly metadata bookkeeping etc. Additionally StateBackends are organized in KeyGroups which allow for rescaling. Support for KeyGroups comes with additional cost of State access methods (it requires hashing the key to compute the group index, has to keep separate StateTables for key groups etc.) This is not necessarily efficient for the bounded style execution where we have a single point in time after which the results will not change and we do not have checkpointing (and in turn rescaling).

The DataSet API on the other hand, sorts/groups the incoming data by the key and performs aggregation for one key only in memory. Because we only ever emit the results once we do not to keep any intermediate results of the aggregations across different keys.

...

We performed some benchmarks to compare results of the different execution styles and using different state backends. The program we performed the tests for is  a version of a WordCount, which in case of the DataStream API is a WordCount in a global window. The job counts 40_000_000 incoming records with 4_000_000 different keys. An interesting observation is that the performance of HeapStateBackend depends heavily on the number of keys. With a high number of keys it suffers a significant penalty and becomes even less performant for that particular case than the sorting approach. The results are as follows:

...

ValueStateBenchmark.valueUpdate      SINGLE_KEY      32883.440 ± 1573.605  ops/ms

Proposed Changes

The current behaviour of the DataStream API is equivalent to a hash-based aggregation algorithm where the StateBackends act as the HashTable.

We suggest to leverage the two characteristics of batch processingexecution mode:

  • we do not have checkpoints
  • we have bounded data

and replace the hash-based aggregations with sort-based aggregations. Assuming that we can fit accumulators for a single key in memory, we can discard the need for the RocksDB in that case.

We do want this change to be mostly transparent to the user code. Therefore, we will introduce a simplified implementation of a StateBackend that is not organized in key groups and only ever keeps values for a single key. In order to be able to use such a StateBackend we must make sure the incoming data is grouped/sorted based on the key. This mean effectively we want to switch Hash-based aggregations with costly StateBackends to Sort-based aggregations with lightweight one.

The single key at a time execution will be used for the Batch style execution as decided by the algortihm algorithm described in FLIP-134: DataStream Semantics for Bounded Input .

...

Code Block
@PublicEvolving
public interface StateBackend extends java.io.Serializable {	

.... 

	<K> InternalKeyedStateBackend<K> createKeyedStateBackend(
		Environment env,
		JobID jobID,
		String operatorIdentifier,
		TypeSerializer<K> keySerializer,
		int numberOfKeyGroups,
		KeyGroupRange keyGroupRange,
		TaskKvStateRegistry kvStateRegistry,
		TtlTimeProvider ttlTimeProvider,
		MetricGroup metricGroup,
		@Nonnull Collection<KeyedStateHandle> stateHandles,
		CloseableRegistry cancelStreamRegistry) throws Exception;

}

How to sort/group keys

The end goal of sorting the incoming data is to group the keys together. The DataSet API requires that the key type is Comparable and uses the natural order for sorting. The DataStream API has not imposed this restriction so far on key types. It only requires that the key implements hashCode. Other systems such as e.g. Beam do not use the natural order, but use the binary representation for sorting/grouping. Therefore we can:

  1. Follow the Beam approach and use the binary representation of the key to perform sorting/grouping. This comes with additional requirements. For example, the binary representation of key types has to be deterministic/stable.
  2. Support only keys that we can create a TypeComparator for. (For GenericTypeInfo they must implement Comparable) in batch style execution (fail hard if that's not the case)

Where to apply the sorting

...

  • implementation at a rather low level
  • rather not trivial to pass the configuration, current implementation is hardcoded in the StreamTask, we would have to pass a flag/provider to the task level, whereas in previous approaches it is already embedded into the StreamGraph
  • we would need to extend the DataOutput to cover also the endInput signal

How to sort/group keys

The end goal of sorting the incoming data is to group the keys together. The DataSet API requires that the key type is Comparable and uses the natural order for sorting. The DataStream API has not imposed this restriction so far on key types. It only requires that the key implements hashCode. Other systems such as e.g. Beam do not use the natural order, but use the binary representation for sorting/grouping. Therefore we can:

...

Compatibility, Deprecation, and Migration Plan

...