Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Discussion thread-
Vote thread-
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32042

Release-

Motivation

Flink creates DynamicTableSource for each source table independently when it generates plan for a SQL job, sources are not aware of each other. In some situation we need to generate unified parameters for all sources in

...

the same job, such as consistency requirements mentioned in FLIP-276 [1]. We would like to introduce customized provider in planner for Flink SQL job to

...

generate dynamic parameters for all source tables, then Flink job can create sources based on these parameters.

Public Interfaces

SourceDynamicParameterProvider

...

Code Block
table.plan.source-dynamic-parameter-provider-factory: {provider factory class}

Proposed Changes

We will add the following steps in PlannerBase.translateToExecNodeGraph to support dynamic parameters for sources

...

Code Block
public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData>
        implements MultipleTransformationTranslator<RowData> {
    @Nullable protected Map<String, String> dynamicParameters;
    
    @Override
    protected Transformation<RowData> translateToPlanInternal(
            PlannerBase planner, ExecNodeConfig config) {
        ....;
        // Create table source with dynamic parameters.
        final ScanTableSource tableSource =
        tableSourceSpec.getScanTableSource(
                planner.getFlinkContext(),
                ShortcutUtils.unwrapTypeFactory(planner),
                dynamicParameters);
        ....;
    }
}

Compatibility, Deprecation, and Migration Plan

This is a newly added feature, so there will be no compatibility issues

Test Plan

UT & IT


[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store