Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Page properties

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

...


...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19268

...

Release1.12


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

As described in the FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) we intend to deprecate and remove the DataSet API in the future in favour of the DataStream API. Using the DataStream API for bounded applications should not result in degraded performance. Ideally, we can stay in the same performance ballpark as equivalent DataSet programs.

...

The DataSet API on the other hand, sorts/groups the incoming data by the key and performs aggregation for one key only in memory. Because we only ever emit the results once we do not to keep any intermediate results of the aggregations across different keys.

Technical details

In DataStream API in order to support a state that does not fit into the memory, users must use the RocksDBStateBackend which enables spilling data on disk.

...

ValueStateBenchmark.valueUpdate      SINGLE_KEY      32883.440 ± 1573.605  ops/ms

Proposed Changes

The current behaviour of the DataStream API is equivalent to a hash-based aggregation algorithm where the StateBackends act as the HashTable.

...

Moreover it will be possible to disable it through a execution.sorted-shuffles.enabled configuration option.

StateBackend

We will introduce a new implementation of a StateBackend which will create a KeyedStateBackend that works with a single key at a time. That KeyedStateBackend will create state objects (ValueState, ListState, MapState) that will only keep a single value in the object's field, i.e. ValueState is a very thin wrapper for a value, MapState for a HashMap,
ListState for a List etc.  That way we get rid of a lookup to the StateTable for the current value for the current key. When the StateBackend observes a new incoming key it will reset all acquired state objects so far, setting the wrapped values to null. We will know that the key changed/finished because we will keep the current key and in the setCurrentKey we check if the new key is different then the current one.

TimerService

We will replace the InternalTimeServiceManager with a customized one which will keep timers only for the current key, and it will fire timers only at the end of each key. The value of InternalTimeService#getCurrentWatermark will be Watermark.MIN_VALUE when processing a key. and will be set to Watermark.MAX_VALUE when firing triggers.

User-visible implications

The proposal does not introduce any new interfaces, though it has some user visible implications.

  1. Users chosen StateBackend will not be used in a bounded style execution, instead we will use the SingleKeyStateBackend
  2. Checkpointing will not work in the bounded execution scenario. We do not keep any state that can be checkpointed.
  3. We do process each key separately, this has to be considered for custom implementations of StreamOperators. Some of the custom StreamOperators might not work in the bounded execution.
    1. the Watermark can jump from the MAX_VALUE (at the end of a key) to MIN_VALUE (at the beginning of the next key). This might break implementations of operators that cache the value of a Watermark and assume it can never move back.
    2. Timers will fire first in the key order and then in timestamp order within the key.
    3. Operators can no longer freely switch processing between different keys. Methods such as e.g. setCurrentKey, applyToAllKeys will not work correctly. Switching the current key to a new value will result in pruning the current state and potentially even firing the timers.

Implementation details

This section contains some lower level details. It contains a few alternatives for certain problems, which we need to decide which we want to implement.

Changed PublicEvolving interfaces

We do need to change some interfaces that are marked as PublicEvolving. However most of the changes apply to classes that even though they are PublicEvolving we do not expect users to really use/implement them.

In order to be able to create that KeyedStateBackend without the overhead of the AbstractKeyedStateBackend we suggest extracting an interface InternalKeyedStateBackend  interface CheckpointableKeyedStateBackend  similarly to the current OperatorStateStore .

Code Block
/**
 * An interface that extends user facing {@link KeyedStateBackend} with internal system specific features.
 */
public interface InternalKeyedStateBackend<K>CheckpointableKeyedStateBackend<K> extends
		KeyedStateBackend<K>,
		SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,
		Closeable,
		CheckpointListener {

	KeyGroupRange getKeyGroupRange();

	boolean supportsAsynchronousSnapshots();

	// TODO remove this once heap-based timers are working with RocksDB incremental snapshots!
	boolean requiresLegacySynchronousTimerSnapshots();
}

Then the StateBackend  class would create this new interface instead of the AbstractKeyedStateBackend :

Code Block
@PublicEvolving
public interface StateBackend extends java.io.Serializable {	

.... 

	<K> InternalKeyedStateBackend<K>CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
		Environment env,
		JobID jobID,
		String operatorIdentifier,
		TypeSerializer<K> keySerializer,
		int numberOfKeyGroups,
		KeyGroupRange keyGroupRange,
		TaskKvStateRegistry kvStateRegistry,
		TtlTimeProvider ttlTimeProvider,
		MetricGroup metricGroup,
		@Nonnull Collection<KeyedStateHandle> stateHandles,
		CloseableRegistry cancelStreamRegistry) throws Exception;

}

How to sort/group keys

The end goal of sorting the incoming data is to group the keys together. The DataSet API requires that the key type is Comparable and uses the natural order for sorting. The DataStream API has not imposed this restriction so far on key types. It only requires that the key implements hashCode. Other systems such as e.g. Beam do not use the natural order, but use the binary representation for sorting/grouping. Therefore we can:

...

  • Support only keys that we can create a TypeComparator for. (For GenericTypeInfo they must implement Comparable) in batch style execution (fail hard if that's not the case)

Where to apply the sorting

The first question is where in the stack do we want to apply the sorting/grouping of incoming elements. We could identify at least 3 different approaches:

...

  • changes to the topology
  • not trivial connection of a two-input operator (via OutputTags)
  • in case of the two input operator at least the merging step must be chained with the two input operator

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

  1. Making it configurable whether to use the single key state backend and sorting or just use regular state backends
  2. Go through the whole open-max-watermark-close cycle for an operator for each key, thus "fixing" the zig-zagging problem of watermarks and timers. Come with other problems, though, for example with broadcast input/two-input operators in general.