Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • When making decisions about the parallelism of the source vertex in adaptive batch scheduler, it can obtain the SourceCoordinator from ExecutionJobVertex to invoke the SourceCoordinator::dynamicInferParallelism() method.
  • Runtime information is provided to assist in determining the parallelism of source vertices. The scheduler exposes runtime informations, including (1) Max source parallelism, the upper limit for the inferred parallelism, which is calculated as the minimum of the default source parallelism (`execution.batch.adaptive.auto-parallelism.default-source-parallelism`) and JobVertex#maxParallelism. If the default-source-parallelism is not set, the global default parallelism is used as the default source parallelism. (2) The dataVolumePerTask is determined by `execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task` as the average data volume that each task needs to consume. (3) DynamicFilteringInfo is provided by the SourceCoordinator (as the adaptive batch scheduler determines the parallelism after the upstream vertices finish, at which point dynamic partition pruning information has been generated).
  • When there are multiple sources of automatically inferred parallelism (1) multiple sources within a job vertex, or (2) a job vertex that acts as both a source and a downstream vertex for other job vertices, the scheduler will take the maximum parallelism from these sources.
  • As the implementation of SourceCoordinator::dynamicInferParallelism() is asynchronous, therefore, we also need to transform the initializeVertices process of adaptive batch scheduler into an asynchronous implementation.

...

Code Block
languagejava
themeConfluence
titleHiveSource
linenumberstrue
class HiveSource implements Source<T, SplitT, EnumChkT>, DynamicParallelismInference {
    ...
    @Override
    public int inferParallelism(DynamicParallelismContext context) {
    	FileEnumerator fileEnumerator = enumeratorFactory.create();

        // apply the dynamic filtering info
        Optional<DynamicFilteringInfo> dynamicFilteringInfo = context.getDynamicFilter();
        if (dynamicFilteringInfo.isPresent()) {
        	fileEnumerator.setDynamicFilteringData(((DynamicFilteringEvent) dynamicFilteringInfo.get())
                                                   .getData());
        }

        // apply the max source parallelism and enumerate splits
        int maxSourceParallelism = context.getMaxSourceParallelism();
        Collection<FileSourceSplit> splits =  fileEnumerator.setMinNumSplits(enumerateSplits(new Path[1], maxSourceParallelism);

        // return inferred source parallelism
        return fileEnumerator.getInferredSourceParallelsimMath.min(splits.size(), maxSourceParallelism);
      }
    ...
}


Implementation Plan

...