THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- When making decisions about the parallelism of the source vertex in adaptive batch scheduler, it can obtain the SourceCoordinator from ExecutionJobVertex to invoke the SourceCoordinator::dynamicInferParallelism() method.
- Runtime information is provided to assist in determining the parallelism of source vertices. The scheduler exposes runtime informations, including (1) Max source parallelism, the upper limit for the inferred parallelism, which is calculated as the minimum of the default source parallelism (`execution.batch.adaptive.auto-parallelism.default-source-parallelism`) and JobVertex#maxParallelism. If the default-source-parallelism is not set, the global default parallelism is used as the default source parallelism. (2) The dataVolumePerTask is determined by `execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task` as the average data volume that each task needs to consume. (3) DynamicFilteringInfo is provided by the SourceCoordinator (as the adaptive batch scheduler determines the parallelism after the upstream vertices finish, at which point dynamic partition pruning information has been generated).
- When there are multiple sources of automatically inferred parallelism (1) multiple sources within a job vertex, or (2) a job vertex that acts as both a source and a downstream vertex for other job vertices, the scheduler will take the maximum parallelism from these sources.
- As the implementation of SourceCoordinator::dynamicInferParallelism() is asynchronous, therefore, we also need to transform the initializeVertices process of adaptive batch scheduler into an asynchronous implementation.
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
class HiveSource implements Source<T, SplitT, EnumChkT>, DynamicParallelismInference { ... @Override public int inferParallelism(DynamicParallelismContext context) { FileEnumerator fileEnumerator = enumeratorFactory.create(); // apply the dynamic filtering info Optional<DynamicFilteringInfo> dynamicFilteringInfo = context.getDynamicFilter(); if (dynamicFilteringInfo.isPresent()) { fileEnumerator.setDynamicFilteringData(((DynamicFilteringEvent) dynamicFilteringInfo.get()) .getData()); } // apply the max source parallelism and enumerate splits int maxSourceParallelism = context.getMaxSourceParallelism(); Collection<FileSourceSplit> splits = fileEnumerator.setMinNumSplits(enumerateSplits(new Path[1], maxSourceParallelism); // return inferred source parallelism return fileEnumerator.getInferredSourceParallelsimMath.min(splits.size(), maxSourceParallelism); } ... } |
Implementation Plan
...