Status

Discussion thread-
Vote thread-
JIRA

Unable to render Jira issues macro, execution error.

Release-

Motivation

Flink creates DynamicTableSource for each source table independently when it generates plan for a SQL job, sources are not aware of each other. In some situation we need to generate options for each source based on all source tables in the same job. For example, we use Flink as OLAP cluster for our HTAP system[1] which will synchronize all data from an OLTP system to column storage at the database level, and then perform OLAP queries in Flink cluster on a unified version to ensure consistency of results. Similarly, we also mentioned in FLIP-276 [2] that it is necessary to support consistent queries across multiple tables.
Besides, many connectors have confidential parameters, such as the username and password of the jdbc connector. Currently, we can only configure them in catalog config file or table options in the sql statement, which is not appropriate.
We would like to introduce a customized provider to generate dynamic options for all source and sink tables, then Flink job can create sources and sinks based on these options.

Proposal

We submit SQL statements to Sql-Gateway through the jdbc driver, Sql-Gateway parses SQL to an Operation and performs it in TableEnvironment. So Sql-Gateway could load the options provider from configuration and register it to TableEnvironment before submitting the Operation. Users who use TableEnvironment to perform jobs directly can create and register the provider themselves. Then TableEnvironment could traverser all Operations, get options for each source and sink from the provider and add them to source and sink operations.

Public Interfaces

TableDynamicOptionProvider

We add TableDynamicOptionProvider to provide dynamic options for each source/sink in jobs.

/** Dynamic option provider for source and sink tables. */
@PublicEvolving
public interface TableDynamicOptionProvider {
    /**
     * Provide dynamic options for source and sink tables.
     *
     * @param context The context for job to get table dynamic options.
     * @return The source dynamic options.
     */
    TableDynamicOpitons getDynamicOptions(TableOptionsContext context);
    
    @PublicEvolving
    static interface TableOptionsContext {
        /**
         * Configuration for the job which provide job customized options, for example, 
         * different strategy to generate table dynamic options.
         **/
        ReadableConfig jobConfiguration();
        
        /** 
         * Get catalog context for given catalog which is used to identify physical catalog.
         * There will be some information about options in `CatalogContext` such as the location
         * where the option values for tables are stored.
         **/
        CatalogContext catalogContext(String catalog);
        
        /** All source tables for the job. */
        Set<ObjectIdentifier> sources();
        
        /**
         * Get current options for the source table which is used to determine the dynamic 
         * options for the table, for example, jdbc table names in the options will be sued to
         * generate username and password.
         **/
        Map<String, String> sourceOptions(ObjectIdentifier table);
        
        /** All sink table for the job. */
        Set<ObjectIdentifier> sinks();
        
        /**
         * Get current options for the sink table which is used to determine the dynamic 
         * options for the table, for example, jdbc table names in the options will be sued to
         * generate username and password. 
         **/
        Map<String, String> sinkOptions(ObjectIdentifier table);
    }
    
    /** Table dynamic option result from the provider. */
    @PublicEvolving
    static interface TableDynamicOpitons {
        /** Get source options for given table identifier. */
        Map<String, String> sourceOptions(ObjectIdentifier table);
        
        /** Get sink options for given table identifier. */
        Map<String, String> sinkOptions(ObjectIdentifier table);
    }
}
 
/** Factory to create table dynamic option provider. */
@PublicEvolving
public interface TableDynamicOptionProviderFactory extends Factory {
    TableDynamicOptionProvider createProvider(Context context);
 
    /** Context for dynamic option provider factory. */
    static interface Context {
        ReadableConfig getConfiguration();
 
        ClassLoader getClassLoader();
    }
}

TableEnvironment

Add registerTableOptionProvider to TableEnvironment.

/** Add register provider method in table environment. */
public interface TableEnvironment {
    void registerTableOptionProvider(TableDynamicOptionProvider provider);
}

Options

Add new option for provider factory in SqlGatewayOptions.

sql-gateway.table-dynamic-option-provider-factory: {provider factory class identifier}

Proposed Changes

Load Provider In Gateway

Sql-Gateway could load the provider factory and options provider from configuration for DefaultContext, then a session is created by SessionManager can get the provider as follows.

/** Load the provider factory and create provider for DefaultContext. */
public class DefaultContext {
    private final TableDynamicOptionProvider tableOptionProvider;
    
    ...;
    
    public static DefaultContext load(
            Configuration dynamicConfig,
            List<URL> dependencies,
            boolean discoverExecutionConfig,
            boolean discoverPythonJar) {
        ...;
        
        TableDynamicOptionProviderFactory providerFactory = FactoryUtil.discoverFactory(
            DefaultContext.class.getClassLoader(),
            TableDynamicOptionProviderFactory.class,
            dynamicConfig.get(SOURCE_DYNAMIC_OPTIONS_FACTORY));
        TableDynamicOptionProvider optionsProvider = 
            providerFactory.create(new Context() {
                ...;
            });
        
        ...;
        
        return new DefaultContext(..., optionsProvider);
    }
}

Option Provider In TableEnvironment

There will be a registerTableOptionProvider method in TableEnvironment. For Sql-Gateway, it can register the loaded provider to TableEnvironment before submitting Operation, and for the users who don't submit jobs through Sql-Gateway, they can create and register TableDynamicOptionProvider for the table environment themselves.
TableEnvironment will traverse the received Operation, get all QueryOperation and ModifyOperation for sources and sinks, and get options for all the source and sink tables at once. After that, the options for each table will be added to the QueryOperation and ModifyOperation.

public class TableEnvironmentImpl implements TableEnvironmentInternal {
    @Nullable private TableDynamicOptionProvider tableDynamicOptionProvider;
     
    /** Register source dynamic option provider. */
    @Override
    public void registerTableDynamicOptionProvider(
            TableDynamicOptionProvider TableDynamicOptionProvider) {
        this.TableDynamicOptionProvider = TableDynamicOptionProvider;
    }
     
    /** Get options for source/sink tables and add them to specified operation. */
    private void provideTableDynamicOptions(List<Operation> operations) {
        Map<ObjectIdentifier, List<SourceQueryOperation>> identifierSourceOperations = traversesourceOperations(operations);
        TableDynamicOpitons tableDynamicOptions = 
          tableDynamicOptionProvider.getDynamicOptions(
              new TableOptionsContext() { ... });
        // Add options in QueryOperation and ModifyOperation.
        ...;
    }
}

Dynamic Options In Operation

Currently there is a field named dynamicOptions in SourceQueryOperation and SinkModifyOperation which will be used to create the source and sink tables from catalogs. We can add dynamic options generated from provider to dynamicOptions and then Flink can create tables according to these dynamic options. For the source and sink Operations without dynamicOptions field, we can add it to them and create RelNode with it in converter.

public class SourceQueryOperation implements QueryOperation {
    private final Map<String, String> dynamicOptions
    ....;
    
    /** Add options to dynamicOptions if it is not existed. */
    public void addDynamicOptions(Map<String, String> options) {
        options.forEach((k, v) -> dynamicOptions.putIfAbsent(k, v));
    }
}

public class SinkModifyOperation implements ModifyOperation {
    private final Map<String, String> dynamicOptions;
    
    public void addDynamicOptions(Map<String, String> options) {
        options.forEach((k, v) -> dynamicOptions.putIfAbsent(k, v));
    }
}

There will be three types of options for tables: options in CREATE ... WITH(options), options from Provider and options in /*+ OPTIONS(...) */ for SQL. The priority here is the lowest for WITH, followed by Provider, and the highest in OPTIONS.

Use Case

Generate Snapshot IDs For Paimon

All tables of a database are synchronized to Paimon such as tableA, tableB and tableC, the data in these tables are managed by an unified version which is mapping to different snapshot ids in them. We can create a PaimonSnapshotTableOptionProvider implements TableDynamicOptionProvider to get different snapshot ids for the above tables according to the unified version in Sql-Gateway as follows.

SET 'sql-gateway.table-dynamic-option-provider-factory'='paimon-provider'; // Set the paimon provider for Sql-Gateway

SELECT * FROM tableA JOIN tableB ON ... JOIN tableC ON ...;

In the above SELECT  query, Flink will use snapshot ids for tableA, tableB and tableC from provider to get specific snapshot data from these tables. For example, snapshot1 for tableA, snapshot3 for tableB and snapshot10 for tableC belong to the same version, they will be provided by the paimon provider for the query even when it is not configured in the sql statement.

Generate Username And Password For Jdbc

Suppose there is a Security Service stores the username and password for RDS and we can develop a JdbcTableSecurityServiceProvider to get them for jdbc tables in job. We can write our jobs in sql as follows and submit it to Sql-Gateway

SET 'sql-gateway.table-dynamic-option-provider-factory'='jdbc-security-provider'; // Set the jdbc security provider for Sql-Gateway

CREATE TABLE my_jdbc_table (
    val1 INT,
    val2 INT)
  WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/test',
    'table-name' = 'mysql_table'
    //'username' = '****'    We don't need to provide username and password for job here
    //'password' = '****'
  );

INSERT INTO my_jdbc_table SELECT ... FROM ... GROUP BY ...;

In the above case, we don't need to add username and password options in our SQL job or just give a placeholder configuration, and we can generate the real username and password for the jdbc tables in a security provider. After dba update the security information for the tables, we can just restart our jobs without any changes in jobs.

Compatibility, Deprecation, and Migration Plan

This is a newly added feature, so there will be no compatibility issues

Test Plan

UT & IT


[1] https://www.vldb.org/pvldb/vol15/p3411-chen.pdf

[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store



  • No labels