Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
Discussion thread | here (<- link to https://mail-archiveslists.apache.org/mod_mbox/flink-dev/)thread/ocftkqy5d2x4n58wzprgm5qqrzzkbmb8 |
---|---|
Vote thread | here (<- link to |
JIRA | here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX) |
Release | 1.19 |
...
We will take HiveSource as an example, and the following pseudocode describes the process of parallelism inference. For Hive source, dynamic parallelism inference in batch scenarios is a superset of static parallelism inference. As a follow-up task, we will consider changing the default value of 'table.exec.hive.infer-source-parallelism' to false.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
class HiveSource implements Source<T, SplitT, EnumChkT>, DynamicParallelismInference { ... @Override public int inferParallelism(DynamicParallelismContext context) { FileEnumerator fileEnumerator = enumeratorFactory.create(); // apply the dynamic filtering info Optional<DynamicFilteringInfo> dynamicFilteringInfo = context.getDynamicFilter(); if (dynamicFilteringInfo.isPresent()) { fileEnumerator.setDynamicFilteringData(((DynamicFilteringEvent) dynamicFilteringInfo.get()) .getData()); } // apply the max source parallelism int maxSourceParallelism = context.getMaxSourceParallelism(); fileEnumerator.setMinNumSplits(maxSourceParallelism); // return inferred source parallelism return fileEnumerator.getInferredSourceParallelsim(); } ... } |
...
- Support lazy initialization of parallelism in OperatorCoordinator and related components.
- Introduce DynamicParallelismInference and DynamicFilteringInfo interfaces. Add preparation and invocation of methods with DynamicParallelismInference interface parameters in SourceCoordinator, and expose SourceCoordinator in ExecutionJobVertex.
- Improve the logic of AdaptiveBatchScheduler for dynamic source parallelism inference.
- Hive/File sources support dynamic parallelism inference and change the default value of 'table.exec.hive.infer-source-parallelism' to false in batch scenarios.
Compatibility, Deprecation, and Migration Plan
...