Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
[This FLIP proposal is a joint work between Zhanghao Chen and Dewei Su .]
Motivation
Currently, Flink Table/SQL jobs do not expose fine-grained control of operator parallelism to users. FLIP-146 brings us support for setting parallelism for sinks, but except for that, one can only set a default global parallelism and all other operators share the same parallelism. However, in many cases, setting parallelism for sources individually is preferable:
Many connectors have an upper bound parallelism to efficiently ingest data. For example, the parallelism of a Kafka source is bound by the number of partitions, any extra tasks would be idle.
Other operators may involve intensive computation and need a larger parallelism.
Goals
Support setting parallelism for Table/SQL sources.
Non-Goals
We deliberately exclude the following from our goals:
Support setting parallelism for individual Table/SQL operators.
Unlike sources whose parallelism can be easily set via a connector option, setting parallelism for each individual operator will inevitably touch a lot on the public interfaces.
Setting parallelism for individual operators is less profitable and has been proven to lack public interest in previous discussions (see [FLINK-31821] FlinkSQL set parallelism for each operator. - ASF JIRA).
Support parallelism inference for Table/SQL sources.
It requires support from sources, will consider adding it after the support is ready.
Public Interfaces
Make all classes that implement ScanRuntimeProvider
interface to also implement the ParallelismProvider
interface:
SourceProvider
SourceFunctionProvider
InputFormatProvider
DataStreamScanProvider
TransformationScanProvider
For each of them, add a new provider creation helper method that takes an extra parameter to specify parallelism. Take SourceProvider
for example:
/** Helper method for creating a Source provider with a provided source parallelism. */
static SourceProvider of(Source<RowData, ?, ?> source, @Nullable Integer sourceParallelism) {
return new SourceProvider() {
@Override public Source<RowData, ?, ?> createSource() {
return source;
}
@Override public boolean isBounded() {
return Boundedness.BOUNDED.equals(source.getBoundedness());
}
@Override public Optional<Integer> getParallelism() {
return Optional.ofNullable(sourceParallelism);
}
};
}
The user specifies the customized parallelism for sources through a new connector option:
Option | Type | Default value |
scan.parallelism | Integer | None (Use global parallelism setting) |
Proposed Changes
Make all classes that implement
ScanRuntimeProvider
interface to also implement theParallelismProvider
interface to allow configuring source parallelism.Adapt
CommonExecTableSourceScan
to get parallelism fromParallelismProvider
if possible and configure the source parallelism into source transformation after validation.Integrate with Kafka & datagen connector to support the new connector option
scan.parallelism
for source parallelism setting.
Other Concerns
Calc operator's parallelism changes with source's
CalcCodeGenerator makes Calc operators use the same parallelism as that of its input operator, which makes Calc operator's parallelism changes with source's. This may lead to unexpected behavior in certain cases: Calc operators are usually computationally cheap, but they can be heavy with UDFs and users may want to control their parallelism by the global default parallelism. We choose to leave the behavior unchanged for now, as in most cases, the behavior is desired to encourage chaining for better performance.
Setting parallelism for sources may break inexplictly keyed streams
Some users may inexplictly reinterprete normal data streams as keyed streams and rely on the fact that all operators use the same parallelism for maintaining the keyed attribute of the streams. For example, a Kafka-to-Kafka ETL job may have its source Kafka topics partitioned by keys before entering the job, and would expect it to remain keyed during processing. Specifying the parallelism of sources may break it by introducing shuffles among tasks, and lead to unexpected behavior. However, we don't think Flink offers such kind of guarantees and users are free not to specify source parallelism.
Compatibility, Deprecation, and Migration Plan
No compatibility issues involved.
Test Plan
The changes will be covered by UTs.