You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state"Under Discussion"

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308

JIRA: Unable to render Jira issues macro, execution error.

Released: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, keyed streams are widely used to perform aggregating operations (e.g., reduce, sum and window) on the elements that have the same key. When executed at runtime, the elements with the same key will be sent to and aggregated by the same task.

The performance of these aggregating operations is very sensitive to the distribution of keys. In the cases where the distribution of keys follows a powerful law, the performance will be significantly downgraded. More unluckily, increasing the degree of parallelism does not help when a task is overloaded by a single key.

Local aggregation is a widely-adopted method to reduce the performance degraded by data skew. We can decompose the aggregating operations into two phases. In the first phase, we aggregate the elements of the same key at the sender side to obtain partial results. Then at the second phase, these partial results are sent to receivers according to their keys and are combined to obtain the final result. Since the number of partial results received by each receiver is limited by the number of senders, the imbalance among receivers can be reduced. Besides, by reducing the amount of transferred data the performance can be further improved.

Note that to achieve the benefits brought by local aggregation, it’s required that the aggregated results can be easily obtained with decomposition and combination. The condition is satisfied by many common aggregating operations, e.g., sum, count and topN. Few other aggregating operations, like cardinality, cannot be easily decomposed and combined, hence will not benefit from the usage of local aggregation.

Public Interfaces

A few APIs to support local aggregation need to be added to DataStream class, list below:

KeyedStream<T, Tuple> localKeyBy(int… fields);
KeyedStream<T, Tuple> localKeyBy(Keys<T> keys);
<K> KeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector);
<K> KeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector, TypeInformation<K> keyType);

The total implementation can be split into a few subtasks(steps):

  • introduce a KeyScope enum to distinguish the key comes from localKeyBy API or general keyBy API
public enum KeyScope {

	GLOBAL(false),
	LOCAL(true);

	private final boolean local;

	KeyScope(boolean local) {
		this.local = local;
	}

	public boolean isLocal() {
		return local;
	}
}
  • introduce a new stream partitioner named LocalKeyedStreamPartitioner, its implementation is similar to ForwardPartitioner to forward the local keyed stream. Note: the code snippet is still based on the old interface.
public class LocalKeyedStreamPartitioner<T, K> extends StreamPartitioner<T>
		implements ConfigurableStreamPartitioner {
	private static final long serialVersionUID = 1L;

	private final int[] returnArray = new int[] {0};

	private final KeySelector<T, K> keySelector;

	private int maxParallelism;

	public LocalKeyedStreamPartitioner(KeySelector<T, K> keySelector, int maxParallelism) {
		Preconditions.checkArgument(maxParallelism > 0, "Number of key-groups must be > 0!");
		this.keySelector = Preconditions.checkNotNull(keySelector);
		this.maxParallelism = maxParallelism;
	}

	public int getMaxParallelism() {
		return maxParallelism;
	}

	@Override
	public int[] selectChannels(
			SerializationDelegate<StreamRecord<T>> record,
			int numberOfOutputChannels) {
		return returnArray;
	}

	@Override
	public StreamPartitioner<T> copy() {
		return this;
	}

	@Override
	public String toString() {
		return "LOCAL_KEYED";
	}

	@Override
	public void configure(int maxParallelism) {
		KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
		this.maxParallelism = maxParallelism;
	}
}
  • add new APIs for DataStream and KeyedStream:
//core internal method in DataStreamprivate KeyedStream<T, Tuple> localKeyBy(Keys<T> keys) {
        return KeyedStream.localKeyedStream(this,
        clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig())));
}
//core internal method in KeyedStream
@Internal
static <T, KEY> KeyedStream<T, KEY> localKeyedStream(DataStream<T> dataStream,
		KeySelector<T, KEY> keySelector,
		TypeInformation<KEY> keyType) {
	PartitionTransformation<T> partitionTransformation = new PartitionTransformation<>(
		dataStream.getTransformation(),
		new LocalKeyedStreamPartitioner<>(keySelector,
			StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM));

	return new KeyedStream<>(dataStream,
		partitionTransformation,
		keySelector,
		keyType,
		KeyScope.LOCAL);
}
  • Introduce LocalKeyedStateHandle interface and implement LocalKeyedKeyGroupsStateHandle / LocalKeyedIncrementalKeyedStateHandle
public interface LocalKeyedStateHandle extends KeyedStateHandle {}

public class LocalKeyedKeyGroupsStateHandle extends KeyGroupsStateHandle
		implements LocalKeyedStateHandle {

	private static final long serialVersionUID = -2194187691361383511L;

	public LocalKeyedKeyGroupsStateHandle(KeyGroupRangeOffsets groupRangeOffsets,
			StreamStateHandle streamStateHandle) {
		super(groupRangeOffsets, streamStateHandle);
	}

	@Override
	public LocalKeyedKeyGroupsStateHandle getIntersection(KeyGroupRange keyGroupRange) {
		return new LocalKeyedKeyGroupsStateHandle(
			getGroupRangeOffsets().getIntersection(keyGroupRange),
			getDelegateStateHandle());
	}

	@Override
	public String toString() {
		//...
	}

}

public class LocalKeyedIncrementalKeyedStateHandle extends IncrementalKeyedStateHandle
		implements LocalKeyedStateHandle {

	private static final long serialVersionUID = 4081913652254161489L;

	public LocalKeyedIncrementalKeyedStateHandle(
			UUID backendIdentifier,
			KeyGroupRange keyGroupRange,
			long checkpointId,
			Map<StateHandleID, StreamStateHandle> sharedState,
			Map<StateHandleID, StreamStateHandle> privateState,
			StreamStateHandle metaStateHandle) {
		super(backendIdentifier, keyGroupRange, checkpointId, sharedState, privateState,
			metaStateHandle);
	}

	@Override
	public String toString() {
		//...
	}

}
  • Support states of local aggregation mergence for RocksDB statebackend
//StateAssignmentOperation.java
//add a new method named reDistributeLocalKeyedStates for StateAssignmentOperation.java to re-distribute local keyed states, its signature is :
public static List<KeyedStateHandle>[] reDistributeLocalKeyedStates(OperatorState operatorState, int newParallelism) {
    //...
}

//RocksDBAggregatingState.java
//when job is rescaling and restoring from local keyed snapshots, we may encounter more than one accumulator, so it needs to be merged
//So we introduce a mergeAccumulatorIfNeed method to do this:
	private ACC mergeAccumulatorIfNeed(List<ACC> list) {
		if (list == null || list.isEmpty()) {
			return null;
		}
		ACC result = list.get(0);
		if (list.size() > 1) {
			for (int i = 1; i < list.size(); i++) {
				ACC other = list.get(i);
				if (result != null || other != null) {
					result = aggFunction.merge(result, other);
				}
			}
		}
		return result;
	}

//and override the getInternal method:
	@Override
	public ACC getInternal(byte[] key) {
		try {
			byte[] valueBytes = backend.db.get(columnFamily, key);
			if (valueBytes == null) {
				return null;
			}
			// When restoring from local keyed snapshots, we may encounter more than one accumulator
			// in case of rescaling.
			if (firstGet) {
				firstGet = false;
				List<ACC> list = deserializeList(valueBytes);
				return mergeAccumulatorIfNeed(list);
			}
			dataInputView.setBuffer(valueBytes);
			return valueSerializer.deserialize(dataInputView);
		} catch (IOException | RocksDBException e) {
			throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
		}
	}

Compatibility, Deprecation, and Migration Plan

This feature is biased towards optimization, so there are no compatibility-related issues.

Test Plan

All relevant changes are verified by unit tests, and if possible, we will try to write integration tests to verify it.

Rejected Alternatives


  • No labels