You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Flink Table/SQL jobs do not expose fine-grained control of operator parallelism to users. FLIP-146 brings us support for setting parallelism for sinks, but except for that, one can only set a default global parallelism and all other operators share the same parallelism. However, in many cases, setting parallelism for sources individually is preferable:

  • Many connectors have an upper bound parallelism to efficiently ingest data. For example, the parallelism of a Kafka source is bound by the number of partitions, any extra tasks would be idle.

  • Other operators may involve intensive computation and need a larger parallelism.

Goals

  • Support setting parallelism for Table/SQL sources.

Non-Goals

We deliberately exclude the following from our goals:

  • Support setting parallelism for individual Table/SQL operators.

    • Unlike sources whose parallelism can be easily set via a connector option, setting parallelism for each individual operator will inevitably touch a lot on the public interfaces.

    • Setting parallelism for individual operators is less profitable and has been proven to lack public interest in previous discussions (see [FLINK-31821] FlinkSQL set parallelism for each operator. - ASF JIRA).

  • Support parallelism inference for Table/SQL sources.

    • It requires support from sources, will consider adding it after the support is ready.

Public Interfaces

Make all classes that implement ScanRuntimeProvider interface to also implement the ParallelismProvider interface:

  • SourceProvider

  • SourceFunctionProvider

  • InputFormatProvider

  • DataStreamScanProvider

  • TransformationScanProvider


For each of them, add a new provider creation helper method that takes an extra parameter to specify parallelism. Take SourceProvider for example:

/** Helper method for creating a Source provider with a provided source parallelism. */
static SourceProvider of(Source<RowData, ?, ?> source, @Nullable Integer sourceParallelism) {
return new SourceProvider() {

@Override public Source<RowData, ?, ?> createSource() {
return source;
}

@Override public boolean isBounded() {
return Boundedness.BOUNDED.equals(source.getBoundedness());
}

@Override public Optional<Integer> getParallelism() {
return Optional.ofNullable(sourceParallelism);
}
};
}

The user specifies the customized parallelism for sources through a new connector option:

Option

Type

Default value

scan.parallelism

Integer

None (Use global parallelism setting)

Proposed Changes

  • Make all classes that implement ScanRuntimeProvider interface to also implement the ParallelismProvider interface to allow configuring source parallelism.

  • Adapt CommonExecTableSourceScan to get parallelism from ParallelismProvider if possible and configure the source parallelism into source transformation after validation.

  • Integrate with Kafka & datagen connector to support the new connector option scan.parallelism for source parallelism setting.

Other Concerns

Calc operator's parallelism changes with source's

CalcCodeGenerator makes Calc operators use the same parallelism as that of its input operator, which makes Calc operator's parallelism changes with source's. This may lead to unexpected behavior in certain cases: Calc operators are usually computationally cheap, but they can be heavy with UDFs and users may want to control their parallelism by the global default parallelism. We choose to leave the behavior unchanged for now, as in most cases, the behavior is desired to encourage chaining for better performance.

Setting parallelism for sources may break inexplictly keyed streams

Some users may inexplictly reinterprete normal data streams as keyed streams and rely on the fact that all operators use the same parallelism for maintaining the keyed attribute of the streams. For example, a Kafka-to-Kafka ETL job may have its source Kafka topics partitioned by keys before entering the job, and would expect it to remain keyed during processing. Specifying the parallelism of sources may break it by introducing shuffles among tasks, and lead to unexpected behavior. However, we don't think Flink offers such kind of guarantees and users are free not to specify source parallelism.

Compatibility, Deprecation, and Migration Plan

No compatibility issues involved.

Test Plan

The changes will be covered by UTs.

  • No labels