Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Discussion thread-
Vote thread-
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32042

Release-

Motivation

Flink creates DynamicTableSource for each source table independently when it generates plan for a SQL job, sources are not aware of each other. In some situation we need to generate unified parameters for all sources options for each source based on all source tables in the same job, such as consistency requirements . For example, we use Flink as OLAP cluster for our HTAP system[1] which will synchronize all data from an OLTP system to column storage at the database level, and then perform OLAP queries in Flink cluster on a unified version to ensure consistency of results. Similarly, we also mentioned in FLIP-276 [1]. 2] that it is necessary to support consistent queries across multiple tables.
Besides, many connectors have confidential parameters, such as the username and password of the jdbc connector. Currently, we can only configure them in catalog config file or table options in the sql statement, which is not appropriate.
We would like to introduce a customized provider in planner for Flink SQL job to generate dynamic parameters options for all source and sink tables, then Flink job can create sources based on these parameters.

Public Interfaces

SourceDynamicParameterProvider

and sinks based on these options.

Proposal

We submit SQL statements to Sql-Gateway through the jdbc driver, Sql-Gateway parses SQL to an Operation and performs it in TableEnvironment. So Sql-Gateway could load the options provider from configuration and register it to TableEnvironment before submitting the Operation. Users who use TableEnvironment to perform jobs directly can create and register the provider themselves. Then TableEnvironment could traverser all Operations, get options for each source and sink from the provider and add them to source and sink operations.

draw.io Diagram
bordertrue
diagramNameflow
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1121
revision3

Public Interfaces

TableDynamicOptionProvider

We add TableDynamicOptionProvider to provide dynamic options for each source/sink in jobsWe add SourceDynamicParameterProvider to provide unified dynamic parameters for all sources.

Code Block
/** Dynamic parameteroption provider for source and sink tables. */
@PublicEvolving
public interface SourceDynamicParameterProviderTableDynamicOptionProvider {
    /**
     * Provide dynamic parametersoptions for sourcessource inand {@linksink SourceDynamicParameter}tables.
     *
     * @param context The context for providerjob to get table dynamic options.
     * @return The source dynamic parametersoptions.
     */
    SourceDynamicParameterTableDynamicOpitons getDynamicParametergetDynamicOptions(ContextTableOptionsContext context);

    /**
 Context for parameter provider. */
@PublicEvolving
    static interface ContextTableOptionsContext {
        /**
         * SourcesConfiguration for the givenjob which provide job.
 customized options, for example, 
    *
     * different strategy to *generate @returntable Sourcedynamic identifiersoptions.
         **/
           Set<ObjectIdentifier> sources ReadableConfig jobConfiguration();

        /**
        /** 
 * Catalog options help provider to get dynamic parameters,* forGet example,catalog theycontext havefor thegiven locationcatalog wherewhich theis parametersused storedto inidentify thephysical catalog.
         *
 There will be some information about options in `CatalogContext` such *as @paramthe cataloglocation
 The name of catalog.
     * where the option *values @returnfor Thetables catalogare optionsstored.
         **/
         @Nullable
CatalogContext catalogContext(String catalog);
        
 Map<String, String> catalogOptions(String catalog);
    }
}

/** FactoryAll tosource createtables dynamicfor parameterthe providerjob. */
@PublicEvolving
public  interface SourceDynamicParameterProviderFactory {
    SourceDynamicParameterProviderSet<ObjectIdentifier> createProvidersources(Context context);
        
        /**
 Context   for dynamic parameter provider factory. */
    interface Context {
        Map<String, String> config();

 Get current options for the source table which is used to determine the dynamic 
         * options for ClassLoader classLoader();
    }
}

/** Dynamic parameters for sources. */
@PublicEvloving
public class SourceDynamicParameter {the table, for example, jdbc table names in the options will be sued to
    private final Map<ObjectIdentifier, Map<String, String>> dynamicParameters;
* generate username and password.
 private final Map<String, String> commonParameters;

    /**/
     * Get dynamic parametersMap<String, forString> each source, different sources may have different parameters.
     *
sourceOptions(ObjectIdentifier table);
        
        /** @returnAll dynamicsink parameterstable for eachthe sourcejob.
     */
    public  Map<ObjectIdentifier, Map<String, String>>Set<ObjectIdentifier> dynamicParameterssinks();
        
        /**
         * Get commoncurrent parametersoptions for allthe sources.
sink table which is used *
to determine the dynamic 
 * @return the common parameters.
    * */
options for the table, publicfor Map<Stringexample, String> commonParameters();
}

Options

Add new option for provider factory in TableConfigOptions .

Code Block
table.plan.source-dynamic-parameter-provider-factory: {provider factory class}

Proposed Changes

We will add the following steps in PlannerBase.translateToExecNodeGraph to support dynamic parameters for sources

1. Create SourceDynamicParameterProvider instance according to the option in TableConfig 

2. Create SourceDynamicParameterProcessor which implements ExecNodeGraphProcessor interface. SourceDynamicParameterProcessor will get dynamic parameters for sources through SourceDynamicParameterProvider and set parameters in each CommonExecTableSourceScan 

3. CommonExecTableSourceScan will create source nodes in method translateToPlanInternal and set parameters to ScanTableSource 

PlannerBase

Code Block
abstract class PlannerBase(jdbc table names in the options will be sued to
    executor: Executor,
    tableConfig: TableConfig,
    val moduleManager: ModuleManager,* generate username and password. 
    val functionCatalog: FunctionCatalog,
   **/
 val catalogManager: CatalogManager,
    isStreamingMode: BooleanMap<String,
 String>   classLoader: ClassLoader)sinkOptions(ObjectIdentifier table);
  extends Planner {}
    @VisibleForTesting
    private[flink] def translateToExecNodeGraph(
        optimizedRelNodes: Seq[RelNode],/** Table dynamic option result from the provider. */
    @PublicEvolving
    static interface TableDynamicOpitons {
   isCompiled: Boolean): ExecNodeGraph = {
 /** Get source options for given table ...;identifier. */
        valMap<String, processorsString> = getExecNodeGraphProcessors
sourceOptions(ObjectIdentifier table);
        
 //  Add dynamic parameter processor to processors
/** Get sink options for given table  addDynamicParameterProcessor(processors)identifier. */
        processors.foldLeft(execGraph)((graphMap<String, processor) => processor.process(graph, context))String> sinkOptions(ObjectIdentifier table);
    }
}
 
/** Factory to create table dynamic option provider. */
@PublicEvolving
public interface TableDynamicOptionProviderFactory extends Factory {
  private  defTableDynamicOptionProvider addDynamicParameterProcessorcreateProvider(processors: Seq[ExecNodeGraphProcessor]): Unit = {
        // Create dynamic parameter process from table config and add it to processorsContext context);
 
    /** Context for dynamic option provider factory. */
    static interface Context {
        ReadableConfig getConfiguration();
 
    }    ClassLoader getClassLoader();
    }
}

...

TableEnvironment

Add registerTableOptionProvider to TableEnvironment.

Code Block
/** Add register provider method in table environment. */
 * Get source dynamic parameters from {@link SourceDynamicParameterProvider} and set them to the
 * sources in exec graph.
public interface TableEnvironment {
    void registerTableOptionProvider(TableDynamicOptionProvider provider);
}

Options

Add new option for provider factory in SqlGatewayOptions.

Code Block
sql-gateway.table-dynamic-option-provider-factory: {provider factory class identifier}

Proposed Changes

Load Provider In Gateway

Sql-Gateway could load the provider factory and options provider from configuration for DefaultContext, then a session is created by SessionManager can get the provider as follows.

Code Block
/** Load the provider factory and create provider for DefaultContext. */
public class SourceDynamicParameterProcessorDefaultContext implements ExecNodeGraphProcessor {
    private final SourceDynamicParameterProviderTableDynamicOptionProvider providertableOptionProvider;
    
    @Override...;
    
    public ExecNodeGraphstatic DefaultContext process(ExecNodeGraph execGraph, ProcessorContext context) {load(
            Configuration dynamicConfig,
        //   Visit allList<URL> CommonExecTableSourceScandependencies,
 get dynamic parameters with provider 
      boolean discoverExecutionConfig,
 // and set the options to CommonExecTableSourceScan
    }
}

DynamicTableSourceSpec

Code Block
public class DynamicTableSourceSpec boolean discoverPythonJar) {
    /**
    ...;
 * Create scan table source with dynamic parameters.
     */
   TableDynamicOptionProviderFactory publicproviderFactory ScanTableSource= getScanTableSourceFactoryUtil.discoverFactory(
        FlinkContext context    DefaultContext.class.getClassLoader(),
        FlinkTypeFactory typeFactory    TableDynamicOptionProviderFactory.class,
        Map<String,   String> dynamicParameters) { dynamicConfig.get(SOURCE_DYNAMIC_OPTIONS_FACTORY));
        ....
TableDynamicOptionProvider optionsProvider = 
     // Copy a resolved table with dynamic parameters.
providerFactory.create(new Context() {
          ContextResolvedTable     resolvedTable =...;
        dynamicParameters == null || dynamicParameters.isEmpty() });
        
        ? contextResolvedTable
...;
        
        return : contextResolvedTable.copy(dynamicParametersnew DefaultContext(..., optionsProvider);
    }
}

Option Provider In TableEnvironment

There will be a registerTableOptionProvider method in TableEnvironment. For Sql-Gateway, it can register the loaded provider to TableEnvironment before submitting Operation, and for the users who don't submit jobs through Sql-Gateway, they can create and register TableDynamicOptionProvider for the table environment themselves.
TableEnvironment will traverse the received Operation, get all QueryOperation and ModifyOperation for sources and sinks, and get options for all the source and sink tables at once. After that, the options for each table will be added to the QueryOperation and ModifyOperation.

Code Block
public class TableEnvironmentImpl implements TableEnvironmentInternal //{
 Create table source with@Nullable newprivate resolvedTableDynamicOptionProvider table.tableDynamicOptionProvider;
     
   tableSource =
   /** Register source dynamic option provider. */
    @Override
    public void FactoryUtil.createDynamicTableSourceregisterTableDynamicOptionProvider(
            TableDynamicOptionProvider TableDynamicOptionProvider) {
  factory,
      this.TableDynamicOptionProvider = TableDynamicOptionProvider;
    }
     resolvedTable.getIdentifier(),
    /** Get options for source/sink tables and add them to specified  resolvedTable.getResolvedTable(),operation. */
    private void provideTableDynamicOptions(List<Operation> operations) {
        loadOptionsFromCatalogTable(resolvedTableMap<ObjectIdentifier, context),
List<SourceQueryOperation>> identifierSourceOperations = traversesourceOperations(operations);
        TableDynamicOpitons tableDynamicOptions = 
         context tableDynamicOptionProvider.getTableConfiggetDynamicOptions(),
              new  context.getClassLoader(),TableOptionsContext() { ... });
        // Add options in QueryOperation and   resolvedTable.isTemporary());
 ModifyOperation.
        ....;
    }
}

...

Dynamic Options In Operation

Currently there is a field named dynamicOptions in SourceQueryOperation and SinkModifyOperation which will be used to create the source and sink tables from catalogs. We can add dynamic options generated from provider to dynamicOptions and then Flink can create tables according to these dynamic options. For the source and sink Operations without dynamicOptions field, we can add it to them and create RelNode with it in converter.

Code Block
public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData>class SourceQueryOperation implements QueryOperation {
    private final Map<String, String> dynamicOptions
    ....;
    
    /** Add options to dynamicOptions if implements MultipleTransformationTranslator<RowData> {it is not existed. */
    @Nullablepublic protectedvoid addDynamicOptions(Map<String, String> dynamicParameters options) {
        options.forEach((k, v) -> dynamicOptions.putIfAbsent(k, v));
    }
}

public class SinkModifyOperation implements @OverrideModifyOperation {
    protectedprivate Transformation<RowData> translateToPlanInternal(
  final Map<String, String> dynamicOptions;
    
    public  PlannerBase plannervoid addDynamicOptions(Map<String, ExecNodeConfigString> configoptions) {
        options....forEach((k, v) -> dynamicOptions.putIfAbsent(k, v));
       }
}

There will be three types of options for tables: options in CREATE ... WITH(options), options from Provider and options in /*+ OPTIONS(...) */ for SQL. The priority here is the lowest for WITH, followed by Provider, and the highest in OPTIONS.

Use Case

Generate Snapshot IDs For Paimon

All tables of a database are synchronized to Paimon such as tableA, tableB and tableC, the data in these tables are managed by an unified version which is mapping to different snapshot ids in them. We can create a PaimonSnapshotTableOptionProvider implements TableDynamicOptionProvider to get different snapshot ids for the above tables according to the unified version in Sql-Gateway as follows.

Code Block
SET 'sql-gateway.table-dynamic-option-provider-factory'='paimon-provider'; // CreateSet tablethe sourcepaimon withprovider for Sql-Gateway

SELECT * FROM tableA JOIN tableB ON ... JOIN tableC ON ...;

In the above SELECT  query, Flink will use snapshot ids for tableA, tableB and tableC from provider to get specific snapshot data from these tables. For example, snapshot1 for tableA, snapshot3 for tableB and snapshot10 for tableC belong to the same version, they will be provided by the paimon provider for the query even when it is not configured in the sql statement.

Generate Username And Password For Jdbc

Suppose there is a Security Service stores the username and password for RDS and we can develop a JdbcTableSecurityServiceProvider to get them for jdbc tables in job. We can write our jobs in sql as follows and submit it to Sql-Gateway

Code Block
SET 'sql-gateway.table-dynamic-option-provider-factory'='jdbc-security-provider'; // Set the jdbc security provider for Sql-Gateway

CREATE TABLE my_jdbc_table dynamic parameters.
        final ScanTableSource tableSource =
        tableSourceSpec.getScanTableSource(
    val1 INT,
    val2 INT)
  WITH (
     planner.getFlinkContext()'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/test',
    'table-name' = 'mysql_table'
    ShortcutUtils.unwrapTypeFactory(planner),
  //'username' = '****'    We don't need to provide username and password for job dynamicParameters);
       here
    //'password' = '****'
  );

INSERT INTO my_jdbc_table SELECT ... FROM ...;
 GROUP   }
}BY ...;

In the above case, we don't need to add username and password options in our SQL job or just give a placeholder configuration, and we can generate the real username and password for the jdbc tables in a security provider. After dba update the security information for the tables, we can just restart our jobs without any changes in jobs.

Compatibility, Deprecation, and Migration Plan

This is a newly added feature, so there will be no compatibility issues

Test Plan

UT & IT


[1] https://www.vldb.org/pvldb/vol15/p3411-chen.pdf

[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store

...