You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state"Under Discussion"

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

(It is still under editing, after done, will open a discussion thread) 

Motivation

According to the "Flink-8571 Provide an enhanced KeyedStream implementation to use ForwardPartitioner", the DataStream could be reinterpreted as KeyedStream with ForwardPartitioner with the following two use cases.

  • The input source stream is already partitioned in the external system.
    • For example, user data have already been grouped by the user ID in the external system.
  • The DataStream is already grouped by key previously with the keyBy().
    • For example, a record with three fields, (key, val_1, val_2), can be grouped first by the key with the keyBy() operation. Then, the data can be summed with val_1 and val_2, separately. 
    • The job can be written as
      • fromSource().keyBy().sum().reinterpretAsKeyedStream().sum()

However, one limitation is that if the input source stream is NOT partitioned exactly the same way Flink's keyBy would partition the data, the reinterpretation would cause the runtime error since KeyGroup mismatch.


For example, we can see that according to the hash function provided by the Flink, the key group of key_0 is 64. However, it is forwarded to the downstream operator, who only takes responsibility for key groups from 0 to 63, but not including 64. The downstream operator would then try to extract and update the state of KeyGroup 64, which it doesn't take responsibility for. Therefore, a runtime error would be triggered.

In this FLIP, we would like to refactor this rein

Public Interfaces


Proposed Changes

Before everything else, we need to give a definition for pre-KeyedStream.

  • The key of a record could and only could appear in one split in the lifecycle of a job. The lifecycle of a job is not only executed in 
  • Pre-KeyedStream is saying that the external system has already partitioned the data stream w.r.t. the selected key.

There are two options for solving the limitation.

Option 1. Re-define KeyGroupRangeAssignment

The KeyGroupRangeAssignment would do two main things.

  • Assigns a key group for a key.
  • Assigns the key to a downstream operator based on its key group and the predefined downstream operator's key group range.

Since the input source stream is already grouped by key and the external system would assign the keys to splits(similar to the key groups) probably in a different way from the Flink. Therefore, we need to substitute the key group assignment with a user-provided one. It should be able to give us the following information, 

  • For the records from the same split, they should be assigned to the same key group.
  • For the records from different splits, they should be assigned to the different key groups.

Overall, the user has to provide the following mapping,

Record → Key → SplitID → KeyGroup

However, this could introduce the following problems.

  1. Users can only provide a function mapping from a key to a split, but the mapping of a split to a key group should be established by the Flink itself since the mapping is dynamic.
  2. However, the split assignment is hard to represent through a function for different sources. For example, the split assignment for Kafka Source is push-based, which would round-robin assigning the new split to source readers. However, the split assignment for File Source is pull-based, which means that the split assignment is irregular. Therefore, the mapping of a split to a key group is dynamic.
  3. The mapping of a key to a split could still be hard for the user to provide since it can be done automatically through an external system.
  4. The key group range needs to be as large as the number of currently existing splits. However, the number of currently existing splits could also be dynamic, while the number of key groups should be pre-defined before the job started. Therefore, users have to estimate the largest number of splits for a moment during the whole lifecycle of the job. For a pre-partitioned stream, the number of splits should be fixed to a small range or even a specific value, since the number of keys should be far smaller than the number of records in most scenarios.

Option 2. (prefer one)

In this option, we would like to address the problem 1 & 2 introduced above. The mapping of a key to a split and a split to a key group can actually be established inside the Source Operator. 

  • The mapping of a key to a split could be omitted because it is obvious information for the Source Operator.
  • The mapping of a split to a key group could also be established inside the Source Operator as well. Since the parallelism of source operators and downstream operators is equal, the KeyGroupRange of downstream operators could be inferred in the source operators. The new split added to the source operator would then register for an unused key group.

As a result, the mapping of a key to a KeyGroup could be established inside the Source Operator, but the problem is that we have to pass down the key group of a record to the downstream operator with the record itself together. But I believe that this is acceptable to the user since the source operators and downstream operators and now chaining together, which means that although memory usage could increase fo a bit, the network consumption is saved.

Rescaling(need to be done)

Limitation

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels