Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
DataStream<Tuple<String, Integer, Integer>> input = fromSource(...);
// If the input source stream is not pre-KeyedStream
// Two aggregation operators would be chained together.
DataStream<Tuple<String, Integer, Integer>> stream = input.keyBy(value -> value.f0)
														  .sum(1)
														  .reinterpretAsKeyedStream(value -> value.f0)
														  .sum(2);
// If the input source stream is pre-KeyedStream
// The source operator and the fowllwing aggregation operators would be chained together. 
DataStream<Tuple<String, Integer, Integer>> stream = input.reinterpretAsKeyedStream(value -> value.f0)
														  .sum(1)
														  .reinterpretAsKeyedStream(value -> value.f0)
														  .sum(2);

Proposed Changes

Before everything else, let's give a definition to the "pre-KeyedStream".

  • The input source stream has been partitioned into different "SourceSplits", e.g., files, partitions, etc., at the external system.
  • The selected key of the input source stream could only exist in one "SourceSplit".

There are two options for solving the problem introduced above.

"Since the problem is caused by pre-KeyedStream, all of the following proposed discussion and modification would be narrowed down in the SourceTask only."

Option 1. Re-define KeyGroupRangeAssignment

...