Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state"Under Discussion"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

(It is still under editing, after done, will open a discussion thread) 

Motivation

According to the "Flink-8571 Provide an enhanced KeyedStream implementation to use ForwardPartitioner", the DataStream could be reinterpreted as KeyedStream with ForwardPartitioner with the following two use cases.

  • The input source stream is already partitioned in the external system.
    • For example, a user data have already been grouped by the user ID in the external system.
  • The DataStream is already grouped by key previously with the keyBy().
    • For example, a record with three fieldfields, (key, val_1, val_2), can be grouped first by the key with the keyBy() operation. Then, the data can be summed with val_1 and val_2, seperatelyseparately
    • The job can be written as
      • fromSource().keyBy().sum().reinterpretAsKeyedStream().sum()

...

In this FLIP, we would like to refactor this rein

Public Interfaces


Proposed Changes

Before everything else, we need to give a definition for pre-KeyedStream.

...

There are two options for solving the limitation.

Option 1. Re-define KeyGroupRangeAssignment

The KeyGroupRangeAssignment would do two main things.

...

  1. Users can only provide a function mapping from a key to a split, but the mapping of a split to a key group should be established by the Flink itself since the mapping is dynamic.
  2. However, the split assignment is hard to represent through a function for different sources. For example, the split assignment for Kafka Source is push-based, which would round-robin assigning the new split to source readers. However, the split assignment for File Source is pull-based, which means that the split assignment is irregular. Therefore, the mapping of a split to a key group is dynamic.
  3. The mapping of a key to a split could still be hard for the user to provide since it can be done automatically through an external system.
  4. The key group range needs to be as large as the number of currently existing splits. However, the number of currently existing splits could also be dynamic, while the number of key groups should be pre-defined before the job started. Therefore, users have to estimate the largest number of splits for a moment during the whole lifecycle of the job. For a pre-partitioned stream, the number of splits should be fixed to a small range or even a specific value, since the number of keys should be far smaller than the number of records in most scenarios.

Option 2. (prefer one)

In this option, we would like to address the problem 1 & 2 introduced above. The mapping of a key to a split and a split to a key group can actually be established inside the Source Operator. 

...

As a result, the mapping of a key to a KeyGroup could be established inside the Source Operator, but the problem is that we have to pass down the key group of a record to the downstream operator with the record itself together. But I believe that this is acceptable to the user since the source operators and downstream operators and now chaining together, which means that although memory usage could increase fo a bit, the network consumption is saved.

Rescaling(need to be done)

Limitation

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.