Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread

here (<- link to https://mail-archiveslists.apache.org/mod_mbox/flink-dev/)thread/ocftkqy5d2x4n58wzprgm5qqrzzkbmb8

Vote threadhere (<- link to 
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Release1.19

...

We will take HiveSource as an example, and the following pseudocode describes the process of parallelism inference. For Hive source, dynamic parallelism inference in batch scenarios is a superset of static parallelism inference. As a follow-up task, we will consider changing the default value of 'table.exec.hive.infer-source-parallelism' to false.

Code Block
languagejava
themeConfluence
titleHiveSource
linenumberstrue
class HiveSource implements Source<T, SplitT, EnumChkT>, DynamicParallelismInference {
    ...
    @Override
    public int inferParallelism(DynamicParallelismContext context) {
    	FileEnumerator fileEnumerator = enumeratorFactory.create();

        // apply the dynamic filtering info
        Optional<DynamicFilteringInfo> dynamicFilteringInfo = context.getDynamicFilter();
        if (dynamicFilteringInfo.isPresent()) {
        	fileEnumerator.setDynamicFilteringData(((DynamicFilteringEvent) dynamicFilteringInfo.get())
                                                   .getData());
        }

        // apply the max source parallelism
        int maxSourceParallelism = context.getMaxSourceParallelism();
        fileEnumerator.setMinNumSplits(maxSourceParallelism);

        // return inferred source parallelism
        return fileEnumerator.getInferredSourceParallelsim();
    }
    ...
}

...

  1. Support lazy initialization of parallelism in OperatorCoordinator and related components.
  2. Introduce DynamicParallelismInference and DynamicFilteringInfo interfaces. Add preparation and invocation of methods with DynamicParallelismInference interface parameters in SourceCoordinator, and expose SourceCoordinator in ExecutionJobVertex.
  3. Improve the logic of AdaptiveBatchScheduler for dynamic source parallelism inference.
  4. Hive/File sources support dynamic parallelism inference and change the default value of 'table.exec.hive.infer-source-parallelism' to false in batch scenarios.

Compatibility, Deprecation, and Migration Plan

...