Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Since all operator on local keyed streams are performed locally, these operators must be chained with their inputs. So we disabled some transform APIs, they are connect / join / intervalJoin / coGroup. Like ForwardPartitioner, we will check the parallelism of upstream and downstream nodes of LocalKeyGroupStreamPartitioners when generating stream graphs. Exceptions will be thrown if their parallelism are not equal.

...

Code Block
languagejava
//StateAssignmentOperation.java
//add a new method named reDistributeLocalKeyedStates for StateAssignmentOperation.java to re-distribute local keyed states, its signature is :
public static List<KeyedStateHandle>[] reDistributeLocalKeyedStates(OperatorState operatorState, int newParallelism) {
    //...
}

//RocksDBAggregatingState.java
//when job is rescaling and restoring from local keyed snapshots, we may encounter more than one accumulator, so it needs to be merged
//So we introduce a mergeAccumulatorIfNeed method to do this:
	private ACC mergeAccumulatorIfNeed(List<ACC> list) {
		if (list == null || list.isEmpty()) {
			return null;
		}
		ACC result = list.get(0);
		if (list.size() > 1) {
			for (int i = 1; i < list.size(); i++) {
				ACC other = list.get(i);
				if (result != null || other != null) {
					result = aggFunction.merge(result, other);
				}
			}
		}
		return result;
	}

//and override the getInternal method:
	@Override
	public ACC getInternal(byte[] key) {
		try {
			byte[] valueBytes = backend.db.get(columnFamily, key);
			if (valueBytes == null) {
				return null;
			}
			// When restoring from local keyed snapshots, we may encounter more than one accumulator
			// in case of rescaling.
			if (firstGet) {
				firstGet = false;
				List<ACC> list = deserializeList(valueBytes);
				return mergeAccumulatorIfNeed(list);
			}
			dataInputView.setBuffer(valueBytes);
			return valueSerializer.deserialize(dataInputView);
		} catch (IOException | RocksDBException e) {
			throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e);
		}
	}

Alternative implementation

In section 3, we described our design based on Flink state. When discussed in the mailing thread, there are also some good suggestions. For example, introduce a stateless operator for local aggregation can provide better performance in some particular scenarios. The stateless operator would buffer the intermediate results and flush them during `StreamOperator::prepareSnapshotPreBarrier()`.

Supporting both two kinds of implementation is valuable, we can get benefit from different scenarios. In our original implementation, we reused window operator (a stateful operator) and window relevant APIs so that we can reuse some Flink concepts and flexible user interfaces. So we need to give a unified abstraction for both implementations.

A good solution to compatible with both is to enhance the WindowOperator and let it support the stateless implementation of local aggregation. We can provide a config option in the WindowOperator and give the choice of both two implementation to users.

Compatibility, Deprecation, and Migration Plan

...