Status
Discussion thread | - | ||||||||
---|---|---|---|---|---|---|---|---|---|
Vote thread | - | ||||||||
JIRA |
| ||||||||
Release | - |
Motivation
Flink creates DynamicTableSource
for each source table independently when it generates plan for a SQL job, sources are not aware of each other. In some situation we need to generate unified parameters for all sources in
...
the same job, such as consistency requirements mentioned in FLIP-276 [1]. We would like to introduce customized provider in planner for Flink SQL job to
...
generate dynamic parameters for all source tables, then Flink job can create sources based on these parameters.
Public Interfaces
SourceDynamicParameterProvider
...
Code Block |
---|
table.plan.source-dynamic-parameter-provider-factory: {provider factory class} |
Proposed Changes
We will add the following steps in PlannerBase.translateToExecNodeGraph
to support dynamic parameters for sources
...
Code Block |
---|
public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData> implements MultipleTransformationTranslator<RowData> { @Nullable protected Map<String, String> dynamicParameters; @Override protected Transformation<RowData> translateToPlanInternal( PlannerBase planner, ExecNodeConfig config) { ....; // Create table source with dynamic parameters. final ScanTableSource tableSource = tableSourceSpec.getScanTableSource( planner.getFlinkContext(), ShortcutUtils.unwrapTypeFactory(planner), dynamicParameters); ....; } } |
Compatibility, Deprecation, and Migration Plan
This is a newly added feature, so there will be no compatibility issues
Test Plan
UT & IT