Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Support setting parallelism for Table/SQL sources to provide more flexibility for performance tuning for Table/SQL pipelines.

Non-Goals

We deliberately exclude the following from our goals:

  • Support setting parallelism for individual Table/SQL operators.

    • Unlike sources whose parallelism can be easily set via a connector option, setting parallelism for each individual operator will inevitably touch a lot on the public interfaces.

    • Setting parallelism for individual operators is less profitable and has been proven to lack public interest in previous discussions (see [FLINK-31821] FlinkSQL set parallelism for each operator. - ASF JIRA).more complex and we'd better to leave it for future work to make this FLIP more focused.

  • Support parallelism inference for Table/SQL sources.

    • It requires support from sources, will consider adding it after the support is ready.

...

  • Make all classes that implement ScanRuntimeProvider interface to also implement the ParallelismProvider interface to allow configuring source parallelism.

  • Adapt CommonExecTableSourceScan to get parallelism from ParallelismProvider if possible and configure the source parallelism into source transformation after validation.

  • Integrate with Kafka & datagen connector to support the new connector option scan.parallelism for source parallelism setting.

Other Concerns

Calc operator's parallelism changes with source's

...

Specifically, two subtleties need to be taken care of when configuring the source parallelism, and they are detailed below.

Keep downstream operators' parallelism unchanged by wrapping the source transformation in a phantom transformation

Currently, all table exec nodes (except for the sink exec node, which has the ability to configure its own parallelism) accept its input parallelism. As a result, adjusting the parallelism of source will cause the parallelism of all downstream operators to change accordingly and contradicts our initial goal to provide more flexibility for performance tuning for Table/SQL pipelines.
We propose to add a new subclass of Transformation named WrapperTransformation to solve the this issue. Specifically, when the parallelism of source is set to be different from the default paralleism, the source transformation is wrapped in WrapperTransformation. WrapperTransformation's getWrapped() method will return the true source transformation so that the real operator and its parallelism are used when building StreamGraph. The parallelism of WrapperTransformation will be set to default parallelism so that the parallelism of downstream operators will not change. Notice that if the source and downstream operator have the same parallelism, WrapperTransformation will not be used.

Deal with changelog messages

A source can generate update and delete data, e.g. use the Upsert Kafka connector to consume changelog messages. If the parallelism of the source and its downstream operators is different, a rebalance partitioner will be used by default, which may mess up the keyed partition.
Specifically, when the source and downstream operators parallelism are different, it can be divided into two categories based on which kind of data the source generates:

  1. When source only generates insert-only data, maintain the current default behavior, that is, the shuffle mode between source and downstream operators is rebalance.

  2. When the source generates update and delete data (determined by checking the existence of a primary key in the source schema), the source will use hash partitioner to send data.

This is also in alignment with the implementation when setting parallelism for sinks.

Other Concerns

Setting parallelism for sources may break inexplictly keyed streams

Some users may inexplictly reinterprete normal data streams as keyed streams and rely on the fact that all operators use the same parallelism for maintaining the keyed attribute of the streams. For example, a Kafka-to-Kafka ETL job may have its source Kafka topics partitioned by keys before entering the job, and would expect it to remain keyed during processing. Specifying the parallelism of sources may break it by introducing shuffles among tasks, and lead to unexpected behavior. However, we don't think Flink offers such kind of guarantees and users are free to explicitly set a primary key to enforce keyed semantics or just not to specify source parallelism.

...