THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
DataStream<Tuple<String, Integer, Integer>> input = fromSource(...); // If the input source stream is not pre-KeyedStream // Two aggregation operators would be chained together. DataStream<Tuple<String, Integer, Integer>> stream = input.keyBy(value -> value.f0) .sum(1) .reinterpretAsKeyedStream(value -> value.f0) .sum(2); // If the input source stream is pre-KeyedStream // The source operator and the fowllwing aggregation operators would be chained together. DataStream<Tuple<String, Integer, Integer>> stream = input.reinterpretAsKeyedStream(value -> value.f0) .sum(1) .reinterpretAsKeyedStream(value -> value.f0) .sum(2); |
Proposed Changes
Before everything else, let's give a definition to the "pre-KeyedStream".
- The input source stream has been partitioned into different "SourceSplits", e.g., files, partitions, etc., at the external system.
- The selected key of the input source stream could only exist in one "SourceSplit".
There are two options for solving the problem introduced above.
"Since the problem is caused by pre-KeyedStream, all of the following proposed discussion and modification would be narrowed down in the SourceTask only."
Option 1. Re-define KeyGroupRangeAssignment
...