Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread

https://lists.apache.org/thread/ocftkqy5d2x4n58wzprgm5qqrzzkbmb8

Vote threadhttps://lists.apache.org/thread/g03m2r8dodz6gn8jgf36mvq60h1tsnqg
JIRA

Unable to render Jira issues macro, execution error.

Release1.19

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In general, there are three main ways to set source parallelism for batch jobs:

  1. User-defined source parallelism. Source parallelism can be configured via global parallelism, or can be assigned by DataStream, Table/SQL(FLIP-367).
  2. Connector static parallelism inference (e.g. hive source).
  3. Dynamic parallelism inference. For batch jobs which use adaptive batch scheduler(FLIP-187), the current implementation will use a global default source parallelism as the inferred parallelism for sources.

Compared to manually setting parallelism, automatic parallelism inference is easier to use and can better adapt to varying data volumes each day. However, static parallelism inference cannot leverage runtime information, resulting in inaccurate parallelism inference (e.g. In the scenario of Flip-248 DynamicPartitionPruning, the actual amount of data that needs to be consumed by source can only be determined at runtime). Therefore, for batch jobs, dynamic parallelism inference is the most ideal, but currently, the support for adaptive batch scheduler is not very comprehensive.

Therefore, we aim to introduce a general interface that enables the adaptive batch scheduler to dynamically infer the source parallelism at runtime.

Public Interfaces

Introduce DynamicParallelismInference interface for Source

We plan to introduce a new interface DynamicParallelismInference, It can be implemented together with the Source interface. And a new interface Context is introduced to provide runtime information that assists in inferring source parallelism.

DynamicParallelismInference

DynamicParallelismInference
@PublicEvolving
public interface DynamicParallelismInference {
    /** 
      * A context that provides dynamic parallelism decision infos. 
     * Currently, this interface is only effective for batch jobs in adaptive batch scheduler.
     */
    interface Context {
        /**
         * Get the dynamic filtering info of the source vertex.
         *
         * @return the dynamic filter instance.
         */
        Optional<DynamicFilteringInfo> getDynamicFilteringInfo;

        /**
         * Get the upper bound for the inferred parallelism.
         *
         * @return the upper bound for the inferred parallelism.
         */
        int getParallelismInferenceUpperBound();

        /**
         * Get the average size of data volume to expect each task instance to process.
         *
         * @return the data volume per task in bytes.
         */
        long getDataVolumePerTask();
    }

    /**
     * The method is invoked on the master (JobManager) before the initialization of the source
     * vertex.
     *
     * @param context The context to get dynamic parallelism decision infos.
     */
    int inferParallelism(Context context);
}

The DynamicPartitionPruning introduced by FLIP-248 improves performance of batch jobs by avoiding reading large amounts of irrelevant data.

Currently, the filtering data is wrapped by class DynamicFilteringEvent, which can only be accessed at the table layer. We plan to introduce the DynamicFilteringInfo as a decorative interface to represent DynamicFilteringEvent in the runtime layer.

DynamicFilteringInfo
/**
 * A decorative interface that indicates it holds the dynamic partition filtering data. The actual 
 * information needs to be obtained from the implementing class 
 * org.apache.flink.table.connector.source.DynamicFilteringEvent.
 */
public interface DynamicFilteringInfo {

}

Configuration

We intend to remove `execution.batch.adaptive.auto-parallelism.default-source-parallelism`'s defalut value. Because the current default value of 1 is not very reasonable, after introducing dynamic source parallelism inference, the default value of 1 is clearly insufficient to serve as an upper bound for parallelism in most cases. The new behavior of the configuration will be explained in the following section, "Proposed Changes - General Idea - Configuration Behavior Changes".

Proposed Changes

General Idea

Configuration Behavior Changes

Currently, for JobVertices without parallelism configured, the AdaptiveBatchScheduler dynamically infers the vertex parallelism based on the volume of input data. Specifically, for Source vertices, it uses the value of `execution.batch.adaptive.auto-parallelism.default-source-parallelism` as the fixed parallelism. If this is not set by the user, the default value of 1  is used as the source parallelism, which is actually a temporary implementation solution.

With the flip proposed change, if user has configured the `execution.batch.adaptive.auto-parallelism.default-source-parallelism`, the configured value will serve as the upper bound for dynamic source parallelism inference. If this is not configured by the user, it will fallback to the `execution.batch.adaptive.auto-parallelism.max-parallelism`'configured value (the global adaptive parallelism inference upper bound). Should this also be unconfigured, the process will continue to fallback to `parallelism.default` as the final parallelism upper bound for dynamic source parallelism inference. Therefore, for most users, it is sufficient to only configure parallelism.default to control the upper bound of parallelism inference, which makes the configuration more concise.

It's worth mentioning that, from the perspective of priority, the user-defined parallelism > the static parallelism inference > dynamic parallelism inference. Because the dynamic source parallelism inference will take effect at the runtime stage and the validity conditions are: (1) the current ExecutionGraph is a dynamic graph, and (2) the parallelism of the source vertex is not specified (i.e., the parallelism is -1). Therefore, sources with user-defined parallelism and enabled static parallelism inference will not be affected by dynamic parallelism inference.

Structural Change

  1. Enable the source connector (only for Flip-27) to access runtime informations (such as dynamic filtering info, max source parallelism...) and infer source parallelism based on these informations.
  2. The adaptive batch scheduler has the ability to invoke the dynamic parallelism inference method of the source connector, set the parallelism of source job vertices, and initialize these job vertices.
  3. Most of the source management in the JM is handled by the SourceCoordinator. We need to create the SourceCoordinator in advance before the adaptive batch scheduler decides on the parallelism (currently, it is created only after the ExecutionJobVertex's parallelism is determined and initialized). We need to redesign the lifecycle of the SourceCoordinator and lazily initialize the modules which rely on parallelism.

The final timing diagram for the dynamic parallelism inference of source vertex is illustrated below.

SourceCoordinator

The SourceCoordinator needs to add the dynamicInferParallelism method that can expose the custom parallelism inference method defined in the source connector.

Due to the fact that parallelism inference in source connector often involves interactions with external systems, we plan to implement the logic asynchronously.

SourceCoordinator
class SourceCoordinator {
    ...
    
    public CompletableFuture<Integer> dynamicInferParallelism(DynamicSourceParallelismInfo dynamicSourceParallelismInfo);
    ...
}


ExecutionJobVertex

ExecutionJobVertex add getSourceCoordinators() method.


ExecutionJobVertex
public Collection<SourceCoordinator<?, ?>> getSourceCoordinators();

AdaptiveBatchScheduler

  • When making decisions about the parallelism of the source vertex in adaptive batch scheduler, it can obtain the SourceCoordinator from ExecutionJobVertex to invoke the SourceCoordinator::dynamicInferParallelism() method.
  • Runtime information is provided to assist in determining the parallelism of source vertices. The scheduler exposes runtime informations, including (1) Parallelism upper bound, which serves as the upper bound for the inferred parallelism. In general, the configured value `execution.batch.adaptive.auto-parallelism.default-source-parallelism` is used as the max source parallelism. If it is not configured, the global default parallelism is used as the max source parallelism (Note: the default-source-parallelism cannot exceed the JobVertex's maxParallelism, so the final value will be calculated as the lower value between the two). (2) The dataVolumePerTask is determined by `execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task` as the average data volume that each task needs to consume. (3) DynamicFilteringInfo is provided by the SourceCoordinator (as the adaptive batch scheduler determines the parallelism after the upstream vertices finish, at which point dynamic partition pruning information has been generated).
  • When there are multiple sources of automatically inferred parallelism (1) multiple sources within a job vertex, or (2) a job vertex that acts as both a source and a downstream vertex for other job vertices, the scheduler will take the maximum parallelism from these sources.
  • As the implementation of SourceCoordinator::dynamicInferParallelism() is asynchronous, therefore, we also need to transform the initializeVertices process of adaptive batch scheduler into an asynchronous implementation.

OperatorCoordinator

The current lifecycle of the OperatorCoordinator in ExecutionJobVertex is shown in the following diagram.

In order for the adaptive batch scheduler to obtain the dynamically inferred parallelism from the source connector before deciding the parallelism of the source, we need to create the OperatorCoordinator in advance (this operation does not affect the jobs with non-dynamic graphs as the parallelism is already determined before the creation of ExecutionJobVertex). Then, after the scheduler determines the final parallelism and the vertex is initialized, the parallelism can be lazily initialized. Currently, all invocations related to parallelism are made after the ExecutionJobVertex is initialized (e.g. the creation of subtask gateways), so it is safe to create the OperatorCoordinator in advance.

DynamicParallelismInference interface implementation

We will take FileSource as an example, and the following pseudocode describes the process of parallelism inference. 

FileSource
class FileSource implements Source<T, SplitT, EnumChkT>, DynamicParallelismInference {
    ...
    @Override
    public int inferParallelism(DynamicParallelismContext context) {
    	FileEnumerator fileEnumerator = enumeratorFactory.create();

        // apply the parallelism upper bound and enumerate splits
        int parallelismUpperBound = context.getParallelismUpperBound();
        Collection<FileSourceSplit> splits = fileEnumerator.enumerateSplits(new Path[1], parallelismUpperBound);

        // return inferred source parallelism
        return Math.min(splits.size(), parallelismUpperBound);
     }
    ...
}


Implementation Plan

  1. Support lazy initialization of parallelism in OperatorCoordinator and related components.
  2. Introduce DynamicParallelismInference and DynamicFilteringInfo interfaces. Add preparation and invocation of methods with DynamicParallelismInference interface parameters in SourceCoordinator, and expose SourceCoordinator in ExecutionJobVertex.
  3. Improve the logic of AdaptiveBatchScheduler for dynamic source parallelism inference.
  4. File sources support dynamic parallelism inference.

Compatibility, Deprecation, and Migration Plan

For streaming jobs and jobs with pre-defined source parallelism (user-defined or connector parallelism inference), there will be no impact.

Limitations

It only works for batch jobs which use AdaptiveBatchScheduler.

Test Plan

This will be tested by unit and integration tests.

The integration test will use a source connector that implements the DynamicParallelismInference interface to perform end-to-end testing and verify if the dynamic parallelism is functioning as expected.

  • No labels