...
For each of them, add a new provider creation helper method that takes an extra parameter to specify parallelism. Take SourceProvider
for example:
...
/** Helper method for creating a Source provider with a provided source parallelism. */
...
static SourceProvider of(Source<RowData, ?, ?> source, @Nullable Integer sourceParallelism) {
...
return new SourceProvider() {
...
@Override
...
...
public Source<RowData, ?, ?> createSource() {
...
return source;
...
}
...
@Override
...
...
public boolean isBounded() {
...
return Boundedness.BOUNDED.equals(source.getBoundedness());
...
}
...
@Override
...
public Optional<Integer> getParallelism() {
...
return Optional.ofNullable(sourceParallelism);
...
}
...
};
...
}
The user specifies the customized parallelism for sources through a new connector option:
Option | Type | Default value |
scan.parallelism | Integer | None (Use global parallelism setting) |
Proposed Changes
Make all classes that implement
ScanRuntimeProvider
interface to also implement theParallelismProvider
interface to allow configuring source parallelism.Adapt
CommonExecTableSourceScan
to get parallelism fromParallelismProvider
if possible and configure the source parallelism into source transformation after validation.Integrate with Kafka & datagen connector to support the new connector option
scan.parallelism
for source parallelism setting.
...