Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


For each of them, add a new provider creation helper method that takes an extra parameter to specify parallelism. Take SourceProvider for example:

...

/** Helper method for creating a Source provider with a provided source parallelism. */

...


static SourceProvider of(Source<RowData, ?, ?> source, @Nullable Integer sourceParallelism) {

...


return new SourceProvider() {

...



@Override

...

 

...

public Source<RowData, ?, ?> createSource() {

...


return source;

...


}

...



@Override

...

 

...

public boolean isBounded() {

...


return Boundedness.BOUNDED.equals(source.getBoundedness());

...


}

...



@Override

...

 public Optional<Integer> getParallelism() {

...


return Optional.ofNullable(sourceParallelism);

...

 
}

...


};

...


}

The user specifies the customized parallelism for sources through a new connector option:

Option

Type

Default value

scan.parallelism

Integer

None (Use global parallelism setting)

Proposed Changes

  • Make all classes that implement ScanRuntimeProvider interface to also implement the ParallelismProvider interface to allow configuring source parallelism.

  • Adapt CommonExecTableSourceScan to get parallelism from ParallelismProvider if possible and configure the source parallelism into source transformation after validation.

  • Integrate with Kafka & datagen connector to support the new connector option scan.parallelism for source parallelism setting.

...