Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state"Under Discussion"

Design documentation: https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308

...

Code Block
languagejava
KeyedStream<T, Tuple> localKeyBy(int… fields);
KeyedStream<T, Tuple> localKeyBy(Keys<T> keys);
<K> KeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector);
<K> KeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector, TypeInformation<K> keyType);

Users can perform local aggregation with local keyed streams. Local keyed streams resemble keyed streams in many respects. They both partition elements according to keys and allow access to states affiliated with keys. But the partitioning in local keyed streams does not require shuffling and is performed locally.

Image Added

Figure 1. Local aggregation.

The difference in partitioning schema makes it non-trivial to save and restore local keyed states, especially when the degree of parallelism changes. In keyed streams, keys are assigned to a set of key groups and we can easily redistribute these key groups to tasks when the degree of parallelism changes. But in local keyed streams, as elements are partitioned locally, each task has a full range of key groups. When the degree of parallelism changes, we have to redistribute the key groups of old tasks and construct new key groups for new tasks.

Image Added

Figure 2. Saving and restoring of local keyed states

Figure 2 illustrates how local keyed states are saved and restored when the degree of parallelism changes. Similar to the operators on keyed streams, after materializing local keyed states in persistent storage, the operators on local keyed streams will response to the checkpoint coordinator with a state handle containing the meta information of its local key groups. Suppose the parallelism is 3 and the max parallelism is 12, then there exist 36 local key groups in the checkpoint.

When the degree of parallelism is changed, the checkpoint coordinator will assign these local key groups to new tasks in a roughly even manner. It will iterate over all state handles and assign state handles to new tasks according to the number of key groups. A state handle will be split if only a portion of its key groups is assigned.

In the illustrated example, the state handle of Task 2 is split into 2 parts, each of which contains the meta information of 6 local key groups. Task 1’ then restores its local keyed states with the state handle of Task 1 and the first part of the state handle of Task 2.

A task may be assigned multiple local key groups with the same id at restoring. These instances should be merged to construct the new key group.

The merging of list states is quite straight-forward. We can simply merge two list states of the same key by appending the elements of one list state to the other one.

Reducing states and aggregating states can also be merged with the help of user-defined functions. But currently these user-defined functions are not saved in checkpoints, making it impossible to perform merging at restoring.

A solution at first thought is to save these user-defined functions in checkpoints. To be backward compatible, this solution will require a different metadata format for the snapshots of local keyed states.

We can also perform lazy merging to avoid the need for the saving of user-defined functions. When restoring from checkpoints, we keep the values from different key group instances in a list and perform merging when they are accessed. That way, local keyed states, and keyed states can share the same metadata format.

Value states and map states can also be merged if users can provide user-defined merge functions in the state descriptors. But the need for the merging of value and map states is not urgent because in most cases users can replace value and map states with reducing and aggregating states if a user-defined merge function can be provided.

Implementation

In the keyed streams produced by localKeyBy, the partition transformation deploys LocalKeyGroupStreamPartitioner instead of KeyGroupStreamPartitioner to partition stream elements.

Since all operator on local keyed streams are performed locally, these operators must be chained with their inputs. Like ForwardPartitioner, we will check the parallelism of upstream and downstream nodes of LocalKeyGroupStreamPartitioners when generating stream graphs. Exceptions will be thrown if their parallelism are not equal.

When generating job graphs, the operators will be chained with the inputs if the partitioner is typed LocalKeyGroupStreamPartitioner.

Besides the selector and the serializer for keys, the scope for key partitioning will also be written in the operator’s configuration. The scope can help the operator know it is performed in a keyed stream or a local keyed stream.

The access to local keyed states is very similar to the access to keyed state except that

  • The key group range is full for all tasks, and
  • The access to value and map states is disallowed.

If the merging of reducing and aggregating states is performed lazily, we should use lists to store the values of reducing and aggregating states. When accessing reducing and aggregating states, we iterate over all the elements in the list and merge these elements to produce the result. The extra merging operation only happens at the first access after restoring. In other cases, there only exists one element in the list and no merging is needed.

The materializing of local keyed states is also similar to that of keyed states, hence can share the same code.

When recovering from failures, the checkpoint coordinator will distribute the state handles of local keyed states to tasks according to the method described in Section 2.1.

When restoring from local keyed state handles, we iterate all key-value pairs in assigned key groups. In the cases where the merging functions are saved in local keyed state handles, we merge the values of the same key with corresponding merge functions. While in the cases where the merging is performed lazily, we simply append values to the lists of their keys.

The total implementation can be split into a few subtasks(steps):

...

Code Block
languagejava
public class LocalKeyedStreamPartitioner<T, K> extends StreamPartitioner<T>
		implements ConfigurableStreamPartitioner {
	private static final long serialVersionUID = 1L;

	private final int[] returnArray = new int[] {0};

	private final KeySelector<T, K> keySelector;

	private int maxParallelism;

	public LocalKeyedStreamPartitioner(KeySelector<T, K> keySelector, int maxParallelism) {
		Preconditions.checkArgument(maxParallelism > 0, "Number of key-groups must be > 0!");
		this.keySelector = Preconditions.checkNotNull(keySelector);
		this.maxParallelism = maxParallelism;
	}

	public int getMaxParallelism() {
		return maxParallelism;
	}

	@Override
	public int[] selectChannels(
			SerializationDelegate<StreamRecord<T>> record,
			int numberOfOutputChannels) {
		return returnArray;
	}

	@Override
	public StreamPartitioner<T> copy() {
		return this;
	}

	@Override
	public String toString() {
		return "LOCAL_KEYED";
	}

	@Override
	public void configure(int maxParallelism) {
		KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
		this.maxParallelism = maxParallelism;
	}
}
  • add new APIs for DataStream and KeyedStream:

...

  • Introduce LocalKeyedStateHandle interface and implement LocalKeyedKeyGroupsStateHandle / LocalKeyedIncrementalKeyedStateHandle
Code Block
languagejava
//marker interface
public interface LocalKeyedStateHandle extends KeyedStateHandle {}

public class LocalKeyedKeyGroupsStateHandle extends KeyGroupsStateHandle
		implements LocalKeyedStateHandle {

	private static final long serialVersionUID = -2194187691361383511L;

	public LocalKeyedKeyGroupsStateHandle(KeyGroupRangeOffsets groupRangeOffsets,
			StreamStateHandle streamStateHandle) {
		super(groupRangeOffsets, streamStateHandle);
	}

	@Override
	public LocalKeyedKeyGroupsStateHandle getIntersection(KeyGroupRange keyGroupRange) {
		return new LocalKeyedKeyGroupsStateHandle(
			getGroupRangeOffsets().getIntersection(keyGroupRange),
			getDelegateStateHandle());
	}

	@Override
	public String toString() {
		//...
	}

}

public class LocalKeyedIncrementalKeyedStateHandle extends IncrementalKeyedStateHandle
		implements LocalKeyedStateHandle {

	private static final long serialVersionUID = 4081913652254161489L;

	public LocalKeyedIncrementalKeyedStateHandle(
			UUID backendIdentifier,
			KeyGroupRange keyGroupRange,
			long checkpointId,
			Map<StateHandleID, StreamStateHandle> sharedState,
			Map<StateHandleID, StreamStateHandle> privateState,
			StreamStateHandle metaStateHandle) {
		super(backendIdentifier, keyGroupRange, checkpointId, sharedState, privateState,
			metaStateHandle);
	}

	@Override
	public String toString() {
		//...
	}

}
  • Support states of local aggregation mergence for RocksDB statebackend

...