Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Discussion thread-
Vote thread-
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32042

Release-

Motivation

Flink creates DynamicTableSource for each source table independently when it generates plan for a SQL job, sources are not aware of each other. In some situation we need to generate unified parameters for all sources options for each source based on all source tables in the same job. For example, we use Flink as OLAP cluster for our HTAP system[1] which will synchronize all data from an OLTP system to column storage at the database level, and then perform OLAP queries in Flink cluster on a unified version to ensure consistency of results. Similarly, we also mentioned in FLIP-276 [2] that it is necessary to support consistent queries across multiple tables.
Besides, many connectors have confidential parameters, such as the username and password of the jdbc connector. Currently, we can only configure them in catalog config file or table options in the sql statement, which is not appropriate.
We would like to introduce a customized provider in planner for Flink SQL job to generate dynamic parameters options for all source and sink tables, then Flink job can create sources based on these parameters.

Public Interfaces

SourceDynamicParameterProvider

and sinks based on these options.

Proposal

We submit SQL statements to Sql-Gateway through the jdbc driver, Sql-Gateway parses SQL to an Operation and performs it in TableEnvironment. So Sql-Gateway could load the options provider from configuration and register it to TableEnvironment before submitting the Operation. Users who use TableEnvironment to perform jobs directly can create and register the provider themselves. Then TableEnvironment could traverser all Operations, get options for each source and sink from the provider and add them to source and sink operations.

draw.io Diagram
bordertrue
diagramNameflow
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1121
revision3

Public Interfaces

TableDynamicOptionProvider

We add TableDynamicOptionProvider to provide dynamic options for each source/sink in jobsWe add SourceDynamicParameterProvider to provide unified dynamic parameters for all sources.

Code Block
/** Dynamic parameteroption provider for source and sink tables. */
@PublicEvolving
public interface SourceDynamicParameterProviderTableDynamicOptionProvider {
    /**
     * Provide dynamic parametersoptions for sourcessource inand {@linksink SourceDynamicParameter}tables.
     *
     * @param context The context for provider job to get table dynamic options.
     * @return The source dynamic parametersoptions.
     */
    SourceDynamicParameterTableDynamicOpitons getDynamicParametergetDynamicOptions(ContextTableOptionsContext context);

    /**
 Context for parameter provider. */
@PublicEvolving
    static interface ContextTableOptionsContext {
        /**
         * SourcesConfiguration for the job which givenprovide job.
 customized options, for example, 
    *
     * different strategy to *generate @returntable Sourcedynamic identifiersoptions.
         **/
           Set<ObjectIdentifier> sources ReadableConfig jobConfiguration();

        /**
        /** *
  Catalog options help provider to get dynamic parameters,* forGet example,catalog theycontext havefor thegiven locationcatalog wherewhich theis parametersused storedto inidentify thephysical catalog.
         *
 There will be some information about options in *`CatalogContext` @paramsuch catalogas Thethe namelocation
 of catalog.
       * where * @return The catalog optionsthe option values for tables are stored.
         **/
         @NullableCatalogContext catalogContext(String catalog);
        Map<String,
  String> catalogOptions(String catalog);
    }
}

/** FactoryAll tosource createtables dynamicfor parameterthe providerjob. */
@PublicEvolving
public  interface SourceDynamicParameterProviderFactory {
    SourceDynamicParameterProviderSet<ObjectIdentifier> createProvidersources(Context context);
        
        /** Context
         * Get current options for dynamicthe parametersource providertable factory. */
    interface Context {which is used to determine the dynamic 
        Map<String * options for the table, String> config();

        ClassLoader classLoader();for example, jdbc table names in the options will be sued to
    }
}

/** Dynamic parameters for sources. */
@PublicEvloving
public class SourceDynamicParameter { generate username and password.
    private final Map<ObjectIdentifier, Map<String, String>> dynamicParameters;**/
    private   final Map<String, String> commonParameters;

 sourceOptions(ObjectIdentifier table);
        
        /** All sink table for the job. */
     * Get dynamic parameters for each source, different sources may have different parameters.
Set<ObjectIdentifier> sinks();
        
        /**
         * @returnGet dynamiccurrent parametersoptions for each source.
     */
    public Map<ObjectIdentifier, Map<String, String>> dynamicParameters();

the sink table which is used to determine the dynamic 
         /**
 options for the table, *for Getexample, commonjdbc parameterstable fornames allin sources.
the options will be sued *to
         * @returngenerate theusername commonand parameterspassword. 
         **/
       public Map<String, String> commonParameterssinkOptions(ObjectIdentifier table);
}

Options

Add new option for provider factory in TableConfigOptions .

Code Block
table.plan.source-dynamic-parameter-provider-factory: {provider factory class}

Proposed Changes

We will add the following steps in PlannerBase.translateToExecNodeGraph to support dynamic parameters for sources

1. Create SourceDynamicParameterProvider instance according to the option in TableConfig 

2. Create SourceDynamicParameterProcessor which implements ExecNodeGraphProcessor interface. SourceDynamicParameterProcessor will get dynamic parameters for sources through SourceDynamicParameterProvider and set parameters in each CommonExecTableSourceScan 

3. CommonExecTableSourceScan will create source nodes in method translateToPlanInternal and set parameters to ScanTableSource 

PlannerBase

Code Block
abstract class PlannerBase(
    executor: Executor,
    tableConfig: TableConfig,
    val moduleManager: ModuleManager,
    val functionCatalog: FunctionCatalog,
    val catalogManager: CatalogManager,
    isStreamingMode: Boolean,
    classLoader: ClassLoader)
  extends Planner {
    @VisibleForTesting
    private[flink] def translateToExecNodeGraph(
        optimizedRelNodes: Seq[RelNode],    }
    
    /** Table dynamic option result from the provider. */
    @PublicEvolving
    static interface TableDynamicOpitons {
        /** Get source options for given table identifier. */
        Map<String, String> sourceOptions(ObjectIdentifier table);
        
        /** Get sink options for given table identifier. */
        isCompiled: Boolean): ExecNodeGraph = {
        ...;
        // Get processors with dynamic parameter processor
        val processors = getExecNodeGraphProcessors
Map<String, String> sinkOptions(ObjectIdentifier table);
    }
}
 
/** Factory to create table dynamic option provider. */
@PublicEvolving
public interface TableDynamicOptionProviderFactory extends Factory {
    TableDynamicOptionProvider createProvider(Context context);
 
    /** Context for dynamic option provider factory. */
    static interface Context {
        ReadableConfig processors.foldLeftgetConfiguration(execGraph)((graph, processor) => processor.process(graph, context)));
 
        ClassLoader getClassLoader();
    }
}

...

TableEnvironment

Add registerTableOptionProvider to TableEnvironment.

Code Block
/** Add register provider method in table environment. */
 * Get source dynamic parameters from {@link SourceDynamicParameterProvider} and set them to the
 * sources in exec graph.
public interface TableEnvironment {
    void registerTableOptionProvider(TableDynamicOptionProvider provider);
}

Options

Add new option for provider factory in SqlGatewayOptions.

Code Block
sql-gateway.table-dynamic-option-provider-factory: {provider factory class identifier}

Proposed Changes

Load Provider In Gateway

Sql-Gateway could load the provider factory and options provider from configuration for DefaultContext, then a session is created by SessionManager can get the provider as follows.

Code Block
/** Load the provider factory and create provider for DefaultContext. */
public class SourceDynamicParameterProcessor implements ExecNodeGraphProcessorDefaultContext {
    private final SourceDynamicParameterProviderTableDynamicOptionProvider providertableOptionProvider;
    
    @Override...;
    
    public ExecNodeGraphstatic DefaultContext process(ExecNodeGraph execGraph, ProcessorContext context) {
load(
            Configuration dynamicConfig,
          // Visit allList<URL> CommonExecTableSourceScandependencies,
 get dynamic parameters with provider 
      boolean discoverExecutionConfig,
 // and set the options to CommonExecTableSourceScan
    }
}

DynamicTableSourceSpec

Code Block
public class DynamicTableSourceSpec boolean discoverPythonJar) {
    /**
    ...;
 * Create scan table source with dynamic parameters.
     */
   TableDynamicOptionProviderFactory publicproviderFactory ScanTableSource= getScanTableSourceFactoryUtil.discoverFactory(
           FlinkContext context DefaultContext.class.getClassLoader(),
        FlinkTypeFactory typeFactory    TableDynamicOptionProviderFactory.class,
        Map<String,   String> dynamicParameters) { dynamicConfig.get(SOURCE_DYNAMIC_OPTIONS_FACTORY));
        ....
TableDynamicOptionProvider optionsProvider = 
     // Copy a resolved table with dynamic parameters.providerFactory.create(new Context() {
        ContextResolvedTable resolvedTable =        ...;
        dynamicParameters == null || dynamicParameters.isEmpty() });
        
        ? contextResolvedTable
...;
        
        return : contextResolvedTable.copy(dynamicParametersnew DefaultContext(..., optionsProvider);
    }
}

Option Provider In TableEnvironment

There will be a registerTableOptionProvider method in TableEnvironment. For Sql-Gateway, it can register the loaded provider to TableEnvironment before submitting Operation, and for the users who don't submit jobs through Sql-Gateway, they can create and register TableDynamicOptionProvider for the table environment themselves.
TableEnvironment will traverse the received Operation, get all QueryOperation and ModifyOperation for sources and sinks, and get options for all the source and sink tables at once. After that, the options for each table will be added to the QueryOperation and ModifyOperation.

Code Block
public class TableEnvironmentImpl  // Create table source with new resolved table.implements TableEnvironmentInternal {
    @Nullable private TableDynamicOptionProvider tableDynamicOptionProvider;
     
    /** Register source dynamic tableSource =
option provider. */
    @Override
    public void FactoryUtil.createDynamicTableSourceregisterTableDynamicOptionProvider(
            TableDynamicOptionProvider TableDynamicOptionProvider) {
  factory,
      this.TableDynamicOptionProvider = TableDynamicOptionProvider;
    }
     resolvedTable.getIdentifier(),
    /** Get options for source/sink tables and add them to specified  resolvedTable.getResolvedTable(),operation. */
    private void provideTableDynamicOptions(List<Operation> operations) {
        loadOptionsFromCatalogTable(resolvedTableMap<ObjectIdentifier, context),List<SourceQueryOperation>> identifierSourceOperations = traversesourceOperations(operations);
        TableDynamicOpitons tableDynamicOptions = 
        context  tableDynamicOptionProvider.getTableConfiggetDynamicOptions(),
              new TableOptionsContext() { context.getClassLoader(),... });
        // Add options in QueryOperation and   resolvedTable.isTemporary());
 ModifyOperation.
        ....;
    }
}

CommonExecTableSourceScan

Dynamic Options In Operation

Currently there is a field named dynamicOptions in SourceQueryOperation and SinkModifyOperation which will be used to create the source and sink tables from catalogs. We can add dynamic options generated from provider to dynamicOptions and then Flink can create tables according to these dynamic options. For the source and sink Operations without dynamicOptions field, we can add it to them and create RelNode with it in converter.

Code Block
public class SourceQueryOperation implements QueryOperation {
    private final Map<String, String> dynamicOptions
    ....;
    
    /** Add options to dynamicOptions if it is not existed. */
Code Block
public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData>
        implements MultipleTransformationTranslator<RowData> {
    @Nullablepublic protectedvoid addDynamicOptions(Map<String, String> dynamicParametersoptions) {
        options.forEach((k, v) -> dynamicOptions.putIfAbsent(k, v));
    }
}

public class SinkModifyOperation implements ModifyOperation @Override{
    protectedprivate Transformation<RowData> translateToPlanInternal(
  final Map<String, String> dynamicOptions;
    
    public  PlannerBase plannervoid addDynamicOptions(Map<String, ExecNodeConfigString> configoptions) {
        options....forEach((k, v) -> dynamicOptions.putIfAbsent(k, v));
    }
}

There will be three types of options for tables: options in CREATE ... WITH(options), options from Provider and options in /*+ OPTIONS(...) */ for SQL. The priority here is the lowest for WITH, followed by Provider, and the highest in OPTIONS.

Use Case

Generate Snapshot IDs For Paimon

All tables of a database are synchronized to Paimon such as tableA, tableB and tableC, the data in these tables are managed by an unified version which is mapping to different snapshot ids in them. We can create a PaimonSnapshotTableOptionProvider implements TableDynamicOptionProvider to get different snapshot ids for the above tables according to the unified version in Sql-Gateway as follows.

Code Block
SET    'sql-gateway.table-dynamic-option-provider-factory'='paimon-provider'; // CreateSet tablethe sourcepaimon withprovider for Sql-Gateway

SELECT * FROM tableA JOIN tableB ON ... JOIN tableC ON ...;

In the above SELECT  query, Flink will use snapshot ids for tableA, tableB and tableC from provider to get specific snapshot data from these tables. For example, snapshot1 for tableA, snapshot3 for tableB and snapshot10 for tableC belong to the same version, they will be provided by the paimon provider for the query even when it is not configured in the sql statement.

Generate Username And Password For Jdbc

Suppose there is a Security Service stores the username and password for RDS and we can develop a JdbcTableSecurityServiceProvider to get them for jdbc tables in job. We can write our jobs in sql as follows and submit it to Sql-Gateway

Code Block
SET 'sql-gateway.table-dynamic-option-provider-factory'='jdbc-security-provider'; // Set the jdbc security provider for Sql-Gateway

CREATE TABLE my_jdbc_table (
    val1 INT,
    val2 INT)
  WITH (
    'connector' = 'jdbc'dynamic parameters.
        final ScanTableSource tableSource =
        tableSourceSpec.getScanTableSource(
                planner.getFlinkContext(),
    'url' = 'jdbc:mysql://localhost:3306/test',
    'table-name' = 'mysql_table'
    ShortcutUtils.unwrapTypeFactory(planner),
  //'username' = '****'    We don't need to provide username and password for job dynamicParameters);here
    //'password' = '****'
  );

INSERT INTO my_jdbc_table SELECT ... FROM ...;
 GROUP   }
}BY ...;

In the above case, we don't need to add username and password options in our SQL job or just give a placeholder configuration, and we can generate the real username and password for the jdbc tables in a security provider. After dba update the security information for the tables, we can just restart our jobs without any changes in jobs.

Compatibility, Deprecation, and Migration Plan

This is a newly added feature, so there will be no compatibility issues

Test Plan

UT & IT


[1] https://www.vldb.org/pvldb/vol15/p3411-chen.pdf

...