Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • When making decisions about the parallelism of the source vertex in adaptive batch scheduler, it can obtain the SourceCoordinator from ExecutionJobVertex to invoke the SourceCoordinator::dynamicInferParallelism() method.
  • Runtime information is provided to assist in determining the parallelism of source vertices. The scheduler exposes runtime informations, including (1) Max source parallelism, which serves as the upper limit for the inferred parallelism, which is calculated as the minimum of the default source parallelism (. In general, the configured value `execution.batch.adaptive.auto-parallelism.default-source-parallelism`) and JobVertex#maxParallelism. If the default-source-parallelism is not set is used as the max source parallelism. If it is not configured, the global default parallelism is used as the max source parallelism (Note: the default-source-parallelism cannot exceed the JobVertex's maxParallelism, so the final value will be calculated as the lower value between the two). (2) The dataVolumePerTask is determined by `execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task` as the average data volume that each task needs to consume. (3) DynamicFilteringInfo is provided by the SourceCoordinator (as the adaptive batch scheduler determines the parallelism after the upstream vertices finish, at which point dynamic partition pruning information has been generated).
  • When there are multiple sources of automatically inferred parallelism (1) multiple sources within a job vertex, or (2) a job vertex that acts as both a source and a downstream vertex for other job vertices, the scheduler will take the maximum parallelism from these sources.
  • As the implementation of SourceCoordinator::dynamicInferParallelism() is asynchronous, therefore, we also need to transform the initializeVertices process of adaptive batch scheduler into an asynchronous implementation.

...