Status
Discussion thread | - |
---|---|
Vote thread | - |
JIRA |
|
Release | - |
Motivation
Flink creates DynamicTableSource
for each source table independently when it generates plan for a SQL job, sources are not aware of each other. In some situation we need to generate unified parameters for all sources in one job, such as consistency requirements mentioned in FLIP-276 [1]. We would like to introduce customized provider in planner for Flink SQL job to generates dynamic parameters for all source tables, then Flink can create sources based on these parameters.
Public Interfaces
SourceDynamicParameterProvider
We add SourceDynamicParameterProvider
to provide unified dynamic parameters for all sources.
/** Dynamic parameter provider. */ @PublicEvolving public interface SourceDynamicParameterProvider { /** * Provide dynamic parameters for sources in {@link SourceDynamicParameter}. * * @param context The context for provider. * @return The source dynamic parameters. */ SourceDynamicParameter getDynamicParameter(Context context); /** Context for parameter provider. */ interface Context { /** * Sources for the given job. * * @return Source identifiers. */ Set<ObjectIdentifier> sources(); /** * Catalog options help provider to get dynamic parameters, for example, they have the location where the parameters stored in the catalog. * * @param catalog The name of catalog. * @return The catalog options. */ @Nullable Map<String, String> catalogOptions(String catalog); } } /** Factory to create dynamic parameter provider. */ @PublicEvolving public interface SourceDynamicParameterProviderFactory { SourceDynamicParameterProvider createProvider(Context context); /** Context for dynamic parameter provider factory. */ interface Context { Map<String, String> config(); ClassLoader classLoader(); } } /** Dynamic parameters for sources. */ @PublicEvloving public class SourceDynamicParameter { private final Map<ObjectIdentifier, Map<String, String>> dynamicParameters; private final Map<String, String> commonParameters; /** * Get dynamic parameters for each source, different sources may have different parameters. * * @return dynamic parameters for each source. */ public Map<ObjectIdentifier, Map<String, String>> dynamicParameters(); /** * Get common parameters for all sources. * * @return the common parameters. */ public Map<String, String> commonParameters(); }
Options
Add new option for provider factory in TableConfigOptions
.
table.plan.source-dynamic-parameter-provider-factory: {provider factory class}
Proposed Changes
We will add the following steps in PlannerBase.translateToExecNodeGraph
to support dynamic parameters for sources
1. Create SourceDynamicParameterProvider
instance according to the option in TableConfig
2. Create SourceDynamicParameterProcessor
which implements ExecNodeGraphProcessor
interface. SourceDynamicParameterProcessor
will get dynamic parameters for sources through SourceDynamicParameterProvider
and set parameters in each CommonExecTableSourceScan
3. CommonExecTableSourceScan
will create source nodes in method translateToPlanInternal
and set parameters to ScanTableSource
PlannerBase
abstract class PlannerBase( executor: Executor, tableConfig: TableConfig, val moduleManager: ModuleManager, val functionCatalog: FunctionCatalog, val catalogManager: CatalogManager, isStreamingMode: Boolean, classLoader: ClassLoader) extends Planner { @VisibleForTesting private[flink] def translateToExecNodeGraph( optimizedRelNodes: Seq[RelNode], isCompiled: Boolean): ExecNodeGraph = { ...; val processors = getExecNodeGraphProcessors // Add dynamic parameter processor to processors addDynamicParameterProcessor(processors) processors.foldLeft(execGraph)((graph, processor) => processor.process(graph, context)) } private def addDynamicParameterProcessor(processors: Seq[ExecNodeGraphProcessor]): Unit = { // Create dynamic parameter process from table config and add it to processors } }
SourceDynamicParameterProcessor
/** * Get source dynamic parameters from {@link SourceDynamicParameterProvider} and set them to the * sources in exec graph. */ public class SourceDynamicParameterProcessor implements ExecNodeGraphProcessor { private final SourceDynamicParameterProvider provider; @Override public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext context) { // Visit all CommonExecTableSourceScan, get dynamic parameters with provider // and set the options to CommonExecTableSourceScan } }
DynamicTableSourceSpec
public class DynamicTableSourceSpec { /** * Create scan table source with dynamic parameters. */ public ScanTableSource getScanTableSource( FlinkContext context, FlinkTypeFactory typeFactory, Map<String, String> dynamicParameters) { .... // Copy a resolved table with dynamic parameters. ContextResolvedTable resolvedTable = dynamicParameters == null || dynamicParameters.isEmpty() ? contextResolvedTable : contextResolvedTable.copy(dynamicParameters); // Create table source with new resolved table. tableSource = FactoryUtil.createDynamicTableSource( factory, resolvedTable.getIdentifier(), resolvedTable.getResolvedTable(), loadOptionsFromCatalogTable(resolvedTable, context), context.getTableConfig(), context.getClassLoader(), resolvedTable.isTemporary()); .... } }
CommonExecTableSourceScan
public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData> implements MultipleTransformationTranslator<RowData> { @Nullable protected Map<String, String> dynamicParameters; @Override protected Transformation<RowData> translateToPlanInternal( PlannerBase planner, ExecNodeConfig config) { ....; // Create table source with dynamic parameters. final ScanTableSource tableSource = tableSourceSpec.getScanTableSource( planner.getFlinkContext(), ShortcutUtils.unwrapTypeFactory(planner), dynamicParameters); ....; } }
Compatibility, Deprecation, and Migration Plan
This is a newly added feature, so there will be no compatibility issues
Test Plan
UT & IT