You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Discussion thread-
Vote thread-
JIRA

Unable to render Jira issues macro, execution error.

Release-

Motivation

Flink creates DynamicTableSource for each source table independently when it generates plan for a SQL job, sources are not aware of each other. In some situation we need to generate unified parameters for all sources in the same job. For example, we use Flink as OLAP cluster for our HTAP system[1] which will synchronize data from an OLTP system to column storage at the database level, and then perform OLAP queries in Flink cluster on a unified version to ensure consistency of results. Similarly, we also mentioned in FLIP-276 [2] that it is necessary to support consistent queries across multiple tables. We would like to introduce customized provider in planner for Flink SQL job to generate dynamic parameters for all source tables, then Flink job can create sources based on these parameters.

Public Interfaces

SourceDynamicParameterProvider

We add SourceDynamicParameterProvider to provide unified dynamic parameters for all sources.

/** Dynamic parameter provider. */
@PublicEvolving
public interface SourceDynamicParameterProvider {
    /**
     * Provide dynamic parameters for sources in {@link SourceDynamicParameter}.
     *
     * @param context The context for provider.
     * @return The source dynamic parameters.
     */
    SourceDynamicParameter getDynamicParameter(Context context);

    /** Context for parameter provider. */
    interface Context {
        /**
         * Sources for the given job.
         *
         * @return Source identifiers.
         */
        Set<ObjectIdentifier> sources();

        /**
         * Catalog options help provider to get dynamic parameters, for example, they have the location where the parameters stored in the catalog.
         *
         * @param catalog The name of catalog.
         * @return The catalog options.
         */
        @Nullable
        Map<String, String> catalogOptions(String catalog);
    }
}

/** Factory to create dynamic parameter provider. */
@PublicEvolving
public interface SourceDynamicParameterProviderFactory {
    SourceDynamicParameterProvider createProvider(Context context);

    /** Context for dynamic parameter provider factory. */
    interface Context {
        Map<String, String> config();

        ClassLoader classLoader();
    }
}

/** Dynamic parameters for sources. */
@PublicEvloving
public class SourceDynamicParameter {
    private final Map<ObjectIdentifier, Map<String, String>> dynamicParameters;
    private final Map<String, String> commonParameters;

    /**
     * Get dynamic parameters for each source, different sources may have different parameters.
     *
     * @return dynamic parameters for each source.
     */
    public Map<ObjectIdentifier, Map<String, String>> dynamicParameters();

    /**
     * Get common parameters for all sources.
     *
     * @return the common parameters.
     */
    public Map<String, String> commonParameters();
}

Options

Add new option for provider factory in TableConfigOptions .

table.plan.source-dynamic-parameter-provider-factory: {provider factory class}

Proposed Changes

We will add the following steps in PlannerBase.translateToExecNodeGraph to support dynamic parameters for sources

1. Create SourceDynamicParameterProvider instance according to the option in TableConfig 

2. Create SourceDynamicParameterProcessor which implements ExecNodeGraphProcessor interface. SourceDynamicParameterProcessor will get dynamic parameters for sources through SourceDynamicParameterProvider and set parameters in each CommonExecTableSourceScan 

3. CommonExecTableSourceScan will create source nodes in method translateToPlanInternal and set parameters to ScanTableSource 

PlannerBase

abstract class PlannerBase(
    executor: Executor,
    tableConfig: TableConfig,
    val moduleManager: ModuleManager,
    val functionCatalog: FunctionCatalog,
    val catalogManager: CatalogManager,
    isStreamingMode: Boolean,
    classLoader: ClassLoader)
  extends Planner {
    @VisibleForTesting
    private[flink] def translateToExecNodeGraph(
        optimizedRelNodes: Seq[RelNode],
        isCompiled: Boolean): ExecNodeGraph = {
        ...;
        val processors = getExecNodeGraphProcessors
        // Add dynamic parameter processor to processors
        addDynamicParameterProcessor(processors)
        processors.foldLeft(execGraph)((graph, processor) => processor.process(graph, context))
    }
    
    private def addDynamicParameterProcessor(processors: Seq[ExecNodeGraphProcessor]): Unit = {
        // Create dynamic parameter process from table config and add it to processors
    }       
}

SourceDynamicParameterProcessor

/**
 * Get source dynamic parameters from {@link SourceDynamicParameterProvider} and set them to the
 * sources in exec graph.
 */
public class SourceDynamicParameterProcessor implements ExecNodeGraphProcessor {
    private final SourceDynamicParameterProvider provider;
    
    @Override
    public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext context) {
        // Visit all CommonExecTableSourceScan, get dynamic parameters with provider 
        // and set the options to CommonExecTableSourceScan
    }
}

DynamicTableSourceSpec

public class DynamicTableSourceSpec {
    /**
     * Create scan table source with dynamic parameters.
     */
    public ScanTableSource getScanTableSource(
        FlinkContext context,
        FlinkTypeFactory typeFactory,
        Map<String, String> dynamicParameters) {
        ....
        // Copy a resolved table with dynamic parameters.
        ContextResolvedTable resolvedTable =
        dynamicParameters == null || dynamicParameters.isEmpty()
                ? contextResolvedTable
                : contextResolvedTable.copy(dynamicParameters);
        // Create table source with new resolved table.
        tableSource =
        FactoryUtil.createDynamicTableSource(
                factory,
                resolvedTable.getIdentifier(),
                resolvedTable.getResolvedTable(),
                loadOptionsFromCatalogTable(resolvedTable, context),
                context.getTableConfig(),
                context.getClassLoader(),
                resolvedTable.isTemporary());
         ....
    }
}

CommonExecTableSourceScan

public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData>
        implements MultipleTransformationTranslator<RowData> {
    @Nullable protected Map<String, String> dynamicParameters;
    
    @Override
    protected Transformation<RowData> translateToPlanInternal(
            PlannerBase planner, ExecNodeConfig config) {
        ....;
        // Create table source with dynamic parameters.
        final ScanTableSource tableSource =
        tableSourceSpec.getScanTableSource(
                planner.getFlinkContext(),
                ShortcutUtils.unwrapTypeFactory(planner),
                dynamicParameters);
        ....;
    }
}

Compatibility, Deprecation, and Migration Plan

This is a newly added feature, so there will be no compatibility issues

Test Plan

UT & IT


[1] https://www.vldb.org/pvldb/vol15/p3411-chen.pdf

[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store



  • No labels