Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/gtpswl42jzv0c9o3clwqskpllnw8rh87
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
/thread/bzlcw28jn0xx1hh45q0ry8wnxf0xoptg
JIRA

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33261

Release1.19Release<Flink Version>


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Currently, all table exec nodes (except for the sink exec node, which has the ability to configure its own parallelism) accept its input parallelism. As a result, adjusting the parallelism of source will cause the parallelism of all downstream operators to change accordingly and contradicts our initial goal to provide more flexibility for performance tuning for Table/SQL pipelines.
We propose to add a new subclass of Transformation named WrapperTransformation SourceTransformationWrapper to solve the this issue. Specifically, when the parallelism of source is set to be different from the default paralleism, the source transformation is wrapped in WrapperTransformation SourceTransformationWrapper. WrapperTransformation SourceTransformationWrapper's getWrapped() method will return the true source transformation so that the real operator and its parallelism are used when building StreamGraph. The parallelism of WrapperTransformation SourceTransformationWrapper will be set to default parallelism so that the parallelism of downstream operators will not change. Notice that if the source and downstream operator have the same parallelism, WrapperTransformation SourceTransformationWrapper will not be used and we will not support wrapping the legacy LegacySourceTransformation.

Deal with changelog messages

...

  1. When source only generates insert-only data, maintain the current default behavior, that is, the shuffle mode between source and downstream operators is rebalance.

  2. When the source generates update and delete data (determined by checking the existence of a primary key in changelog mode of the source schema), the source will use hash partitioner to send data. Notice that the use of hash partitioner requires the existence of a primary key, and an exception will be thrown if no primary key defined in the source schema.

This is also in alignment with the implementation when setting parallelism for sinks.

...

Compatibility, Deprecation, and Migration Plan

...

For users who want to upgrade an existing job to set sources with a parallelism different from the default paralleism, the following imcompatibilities shall be noted:

  1. Old state may be incompatible due to topology change resulted from breaking source chaining.
  2. For CDC sources, the primary key constraint is manditary in this case to ensure data correctness.

Test Plan

The changes will be covered by UTs.