Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The end goal of sorting the incoming data is to group the keys together. The DataSet API requires that the key type is Comparable and uses the natural order for sorting. The DataStream API has not imposed this restriction so far on key types. It only requires that the key implements hashCode. Other systems such as e.g. Beam do not use the natural order, but use the binary representation for sorting/grouping. Therefore we can:

  • Follow the Beam approach and use the binary representation of the key to perform sorting/grouping. This comes with additional requirements. For example, the binary representation of key types has to be deterministic/stable.

...


REJECTED ALTERNATIVE:

  • Support only keys that we can create a TypeComparator for. (For GenericTypeInfo they must implement Comparable) in batch style execution (fail hard if that's not the case)

Where to apply the sorting

The first question is where in the stack do we want to apply the sorting/grouping of incoming elements. We could identify at least 3 different approaches:

PREFERRED option in the discussion:

1. Built-in sorting into org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput

Pros:

  • involves the least virtual calls
  • no topology changes

Cons:

  • implementation at a rather low level
  • rather not trivial to pass the configuration, current implementation is hardcoded in the StreamTask, we would have to pass a flag/provider to the task level, whereas in previous approaches it is already embedded into the StreamGraph
  • we would need to extend the DataOutput to cover also the endInput signal

REJECTED alternatives:

  1.  Create a wrapper operator

...

  • changes to the topology
  • not trivial connection of a two-input operator (via OutputTags)
  • in case of the two input operator at least the merging step must be chained with the two input operator

3. Built-in sorting into org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput PREFERRED OPTION

Pros:

  • involves the least virtual calls
  • no topology changes

Cons:

  • implementation at a rather low level
  • rather not trivial to pass the configuration, current implementation is hardcoded in the StreamTask, we would have to pass a flag/provider to the task level, whereas in previous approaches it is already embedded into the StreamGraph
  • we would need to extend the DataOutput to cover also the endInput signal

Compatibility, Deprecation, and Migration Plan

...