You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In general, there are three main ways to set source parallelism for batch jobs:

  1. User-defined source parallelism. Source parallelism can be configured via global parallelism, or can be assigned by DataStream, Table/SQL(FLIP-367).
  2. Connector static parallelism inference (e.g. hive source).
  3. Dynamic parallelism inference. For batch jobs which use adaptive batch scheduler, the current implementation will use a global default source parallelism as the inferred parallelism for sources.

Compared to manually setting parallelism, automatic parallelism inference is easier to use and can better adapt to varying data volumes each day. However, static parallelism inference cannot leverage runtime information, resulting in inaccurate parallelism inference (e.g. In the scenario of Flip-248 DynamicPartitionPruning, the actual amount of data that needs to be consumed by source can only be determined at runtime). Therefore, for batch jobs, dynamic parallelism inference is the most ideal, but currently, the support for adaptive batch scheduler is not very comprehensive.

Therefore, we aim to introduce a general interface that enables the adaptive batch scheduler to dynamically infer the source parallelism at runtime.

Public Interfaces

Introduce DynamicParallelismInference interface for Source

We plan to introduce a new interface DynamicParallelismInference, It can be implemented together with the Source interface. And a new interface Context is introduced to provide runtime information that assists in inferring source parallelism.

DynamicParallelismInference

DynamicParallelismInference
@PublicEvolving
public interface DynamicParallelismInference {
    /** 
      * A context that provides dynamic parallelism decision infos. 
     * Currently, this interface is only effective for batch jobs in adaptive batch scheduler.
     */
    interface Context {
        /**
         * Get the dynamic filtering info of the source vertex.
         *
         * @return the dynamic filter instance.
         */
        Optional<DynamicFilteringInfo> getDynamicFilteringInfo;

        /**
         * Get the max source parallelism.
         *
         * @return the max source parallelism.
         */
        int getMaxSourceParallelism();

        /**
         * Get the average size of data volume to expect each task instance to process.
         *
         * @return the data volume per task in bytes.
         */
        long getDataVolumePerTask();
    }

    /**
     * The method is invoked on the master (JobManager) before the initialization of the source
     * vertex.
     *
     * @param context The context to get dynamic parallelism decision infos.
     */
    int inferParallelism(Context context);
}

The DynamicPartitionPruning introduced by FLIP-248 improves performance of batch jobs by avoiding reading large amounts of irrelevant data.

Currently, the filtering data is wrapped by class DynamicFilteringEvent, which can only be accessed at the table layer. We plan to introduce the DynamicFilteringInfo as a decorative interface to represent DynamicFilteringEvent in the runtime layer.

DynamicFilteringInfo
/**
 * A decorative interface that indicates it holds the dynamic partition filtering data. The actual 
 * information needs to be obtained from the implementing class 
 * org.apache.flink.table.connector.source.DynamicFilteringEvent.
 */
public interface DynamicFilteringInfo {

}


Proposed Changes

General Idea

  1. Enable the source connector (only for Flip-27) to access runtime informations (such as dynamic filtering info, max source parallelism...) and infer source parallelism based on these informations.
  2. The adaptive batch scheduler has the ability to invoke the dynamic parallelism inference method of the source connector, set the parallelism of source job vertices, and initialize these job vertices.
  3. Most of the source management in the JM is handled by the SourceCoordinator. We need to create the SourceCoordinator in advance before the adaptive batch scheduler decides on the parallelism (currently, it is created only after the ExecutionJobVertex's parallelism is determined and initialized). We need to redesign the lifecycle of the SourceCoordinator and lazily initialize the modules which rely on parallelism.

The final timing diagram for the dynamic parallelism inference of source vertex by the AdaptiveBatchScheduler is illustrated below.

SourceCoordinator

The SourceCoordinator needs to add the dynamicInferParallelism method that can expose the custom parallelism inference method defined in the source connector.

Due to the fact that parallelism inference in source connector often involves interactions with external systems, we plan to implement the logic asynchronously.

SourceCoordinator
class SourceCoordinator {
    ...
    
    public CompletableFuture<Integer> dynamicInferParallelism(DynamicSourceParallelismInfo dynamicSourceParallelismInfo);
    ...
}


ExecutionJobVertex

ExecutionJobVertex add getSourceCoordinators() method.


ExecutionJobVertex
public Collection<SourceCoordinator<?, ?>> getSourceCoordinators();

AdaptiveBatchScheduler

  • When making decisions about the parallelism of the source vertex in adaptive batch scheduler, it can obtain the SourceCoordinator from ExecutionJobVertex to invoke the SourceCoordinator::dynamicInferParallelism() method.
  • Runtime information is provided to assist in determining the parallelism of source vertices. The scheduler exposes runtime informations, including (1) Max source parallelism, which is calculated as the minimum of the default source parallelism (`execution.batch.adaptive.auto-parallelism.default-source-parallelism`) and JobVertex#maxParallelism. If the default-source-parallelism is not set, the global default parallelism is used as the default source parallelism. (2) The dataVolumePerTask is determined by `execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task` as the average data volume that each task needs to consume. (3) DynamicFilteringInfo is provided by the SourceCoordinator (as the adaptive batch scheduler determines the parallelism after the upstream vertices finish, at which point dynamic partition pruning information has been generated).
  • When there are multiple sources of automatically inferred parallelism (1) multiple sources within a job vertex, or (2) a job vertex that acts as both a source and a downstream vertex for other job vertices, the scheduler will take the maximum parallelism from these sources.
  • As the implementation of SourceCoordinator::dynamicInferParallelism() is asynchronous, therefore, we also need to transform the initializeVertices process of adaptive batch scheduler into an asynchronous implementation.

OperatorCoordinator

The current lifecycle of the OperatorCoordinator in ExecutionJobVertex is shown in the following diagram.

In order for the adaptive batch scheduler to obtain the dynamically inferred parallelism from the source connector before deciding the parallelism of the source, we need to create the OperatorCoordinator in advance (this operation does not affect the jobs with non-dynamic graphs as the parallelism is already determined before the creation of ExecutionJobVertex). Then, after the scheduler determines the final parallelism and the vertex is initialized, the parallelism can be lazily initialized. Currently, all invocations related to parallelism are made after the ExecutionJobVertex is initialized (e.g. the creation of subtask gateways), so it is safe to create the OperatorCoordinator in advance.

DynamicParallelismInference interface implementation

We will take HiveSource as an example, and the following pseudocode describes the process of parallelism inference.

HiveSource
class HiveSource implements Source<T, SplitT, EnumChkT>, DynamicParallelismInference {
    ...
    @Override
    public int inferParallelism(DynamicParallelismContext context) {
    	FileEnumerator fileEnumerator = enumeratorFactory.create();

        // apply the dynamic filtering info
        Optional<DynamicFilteringInfo> dynamicFilteringInfo = context.getDynamicFilter();
        if (dynamicFilteringInfo.isPresent()) {
        	fileEnumerator.setDynamicFilteringData(((DynamicFilteringEvent) dynamicFilteringInfo.get())
                                                   .getData());
        }

        // apply the max source parallelism
        int maxSourceParallelism = context.getMaxSourceParallelism();
        fileEnumerator.setMinNumSplits(maxSourceParallelism);

        // return inferred source parallelism
        return fileEnumerator.getInferredSourceParallelsim();
    }
    ...
}


Implementation Plan

  1. Support lazy initialization of parallelism in OperatorCoordinator and related components.
  2. Introduce DynamicParallelismInference and DynamicFilteringInfo interfaces. Add preparation and invocation of methods with DynamicParallelismInference interface parameters in SourceCoordinator, and expose SourceCoordinator in ExecutionJobVertex.
  3. Improve the logic of AdaptiveBatchScheduler for dynamic source parallelism inference.
  4. Hive/File sources support dynamic parallelism inference.

Compatibility, Deprecation, and Migration Plan

For streaming jobs and jobs with pre-defined source parallelism (user-defined or connector parallelism inference), there will be no impact.

For batch jobs that rely on the adaptive batch scheduler to infer the parallelism of sources, the `execution.batch.adaptive.auto-parallelism.default-source-parallelism` serves as an upper limit for the inferred parallelism rather than the final parallelism. Additionally, if `execution.batch.adaptive.auto-parallelism.default-source-parallelism` is not set, the globally default parallelism is used as the upper limit for the inferred parallelism.

Limitations

It only works for batch jobs which use AdaptiveBatchScheduler.

Test Plan

This will be tested by unit and integration tests.

The integration test will use a source connector that implements the DynamicParallelismInference interface to perform end-to-end testing and verify if the dynamic parallelism is functioning as expected.

  • No labels