Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The input source stream is already partitioned in the external system.
    • For example, user data have already been grouped by the user ID in the external system.
  • The DataStream is already grouped by key previously with the keyBy().
    • For example, a record with three fields, (key, val_1, val_2), can be grouped first by the key with the keyBy() operation. Then, the data can be summed with val_1 and val_2, separately. 
  • Some use cases in the ML are case 1case 2, case 3.

However, one limitation is that for the first use case, if the input source stream is NOT partitioned exactly the same way Flink's keyBy would partition the data, the reinterpretation would cause the runtime error since KeyGroup mismatch.

...

In this example, we can see that according to the hash function used by the Flink, the key group of key_0 is 64. However, it is forwarded to the downstream operator, who only takes responsibility for key groups from 0 to 63, but not including 64. The downstream operator would then try to extract and update the state of KeyGroup 64, which it doesn't take responsibility for. Therefore, a runtime error would be triggered.

...

This FLIP would like to address this problem and maybe we can introduce the reinterpretAsKeyedStream() to the DataStream formally so that more and more users can be aware of this API.

Public Interfaces

Anchor
reinterpretAsKeyedStream
reinterpretAsKeyedStream

...

Since the input source stream is already grouped by key and the external system would assign the keys to one split probably in a different way from the Flink. For example, the first 100 keys seen are assigned to the first split. Therefore, we could try to replace the key group assignment with a user-provided one to avoid the problem of the KeyGroup mismatch introduced above. Also, we have to make sure that the keys from different splits would not assign to the same key group since, during rescaling, the state of the source operator has to keep pace with the keyed state of downstream operators.

...

One more modification is that because key groups are now previously statically computed and allocated, there is no set method of a key group in KeyedStateBackend and now it is added in the FLIP.

...

For option 2, it is just the opposite to option 1. The pro pros are as follow,

  • The users need not provide other information except for KeySelector. It is keeping clean for users to use this API. 
  • The wrapping StreamRecord could help to reduce duplicated extraction and computation of key and keyGroup.

...

Rejected Alternatives

No option is rejected yeyyet.