Status
Discussion thread | - | ||||||||
---|---|---|---|---|---|---|---|---|---|
Vote thread | - | ||||||||
JIRA |
| ||||||||
Release | - |
Motivation
Flink creates DynamicTableSource
for each source table independently when it generates plan for a SQL job, sources are not aware of each other. In some situation we need to generate
...
options for each source based on all source tables in the same job. For example, we use Flink as OLAP cluster for our HTAP system[1] which will synchronize all data from an OLTP system to column storage at the database level, and then perform OLAP queries in Flink cluster on a unified version to ensure consistency of results. Similarly, we also mentioned in FLIP-276 [
...
2] that it is necessary to support consistent queries across multiple tables.
Besides, many connectors have confidential parameters, such as the username and password of the jdbc connector. Currently, we can only configure them in catalog config file or table options in the sql statement, which is not appropriate.
We would like to introduce a customized provider
...
to generate dynamic options for all source and sink tables, then Flink job can create sources
...
and sinks based on these options.
Proposal
We submit SQL statements to Sql-Gateway through the jdbc driver, Sql-Gateway parses SQL to an Operation
and performs it in TableEnvironment
. So Sql-Gateway could load the options provider from configuration and register it to TableEnvironment
before submitting the Operation
. Users who use TableEnvironment
to perform jobs directly can create and register the provider themselves. Then TableEnvironment
could traverser all Operations
, get options for each source and sink from the provider and add them to source and sink operations.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Public Interfaces
...
TableDynamicOptionProvider
We add SourceDynamicParameterProvider
TableDynamicOptionProvider
to provide unified dynamic parameters for all sourcesoptions for each source/sink in jobs.
Code Block |
---|
/** Dynamic parameteroption provider for source and sink tables. */ @PublicEvolving public interface SourceDynamicParameterProviderTableDynamicOptionProvider { /** * Provide dynamic parametersoptions for sourcessource inand {@linksink SourceDynamicParameter}tables. * * @param context The context for provider job to get table dynamic options. * @return The source dynamic parametersoptions. */ SourceDynamicParameterTableDynamicOpitons getDynamicParametergetDynamicOptions(ContextTableOptionsContext context); /** Context for parameter provider. */ @PublicEvolving static interface ContextTableOptionsContext { /** * SourcesConfiguration for the job which givenprovide job. customized options, for example, * * different strategy to *generate @returntable Sourcedynamic identifiersoptions. **/ Set<ObjectIdentifier> sources ReadableConfig jobConfiguration(); /** /** * Catalog options help provider to get dynamic parameters,* forGet example,catalog theycontext havefor thegiven locationcatalog wherewhich theis parametersused storedto inidentify thephysical catalog. * There will be some information about options in `CatalogContext` *such as @paramthe cataloglocation The name of catalog. * where the option *values @returnfor Thetables catalogare optionsstored. **/ CatalogContext @NullablecatalogContext(String catalog); Map<String, String> catalogOptions(String catalog); } } /** FactoryAll tosource createtables dynamicfor parameterthe providerjob. */ @PublicEvolving public interface SourceDynamicParameterProviderFactory { SourceDynamicParameterProviderSet<ObjectIdentifier> createProvidersources(Context context); /** Context for dynamic parameter provider factory. */ Get current options for interfacethe Contextsource { table which is used to determine the dynamic Map<String, String> config(); * ClassLoader classLoader(); } } /** Dynamic parameters for sources. */ @PublicEvloving public class SourceDynamicParameter {options for the table, for example, jdbc table names in the options will be sued to private final Map<ObjectIdentifier, Map<String, String>>* dynamicParameters; generate username and password. private final Map<String, String> commonParameters; /**/ * Get dynamic parametersMap<String, forString> each source, different sources may have different parameters. * sourceOptions(ObjectIdentifier table); /** @returnAll dynamicsink parameterstable for eachthe sourcejob. */ public Map<ObjectIdentifier, Map<String, String>>Set<ObjectIdentifier> dynamicParameterssinks(); /** * Get commoncurrent parametersoptions for allthe sources. sink table which is used * to determine the dynamic * @return the common parameters. * */ options for the table, publicfor Map<Stringexample, String> commonParameters(); } |
Options
Add new option for provider factory in TableConfigOptions
.
Code Block |
---|
table.plan.source-dynamic-parameter-provider-factory: {provider factory class} |
Proposed Changes
We will add the following steps in PlannerBase.translateToExecNodeGraph
to support dynamic parameters for sources
1. Create SourceDynamicParameterProvider
instance according to the option in TableConfig
2. Create SourceDynamicParameterProcessor
which implements ExecNodeGraphProcessor
interface. SourceDynamicParameterProcessor
will get dynamic parameters for sources through SourceDynamicParameterProvider
and set parameters in each CommonExecTableSourceScan
3. CommonExecTableSourceScan
will create source nodes in method translateToPlanInternal
and set parameters to ScanTableSource
PlannerBase
Code Block |
---|
abstract class PlannerBase(jdbc table names in the options will be sued to executor: Executor, tableConfig: TableConfig, val moduleManager: ModuleManager,* generate username and password. val functionCatalog: FunctionCatalog, **/ val catalogManager: CatalogManager, isStreamingMode: BooleanMap<String, String> classLoader: ClassLoader)sinkOptions(ObjectIdentifier table); extends Planner {} @VisibleForTesting private[flink] def translateToExecNodeGraph( optimizedRelNodes: Seq[RelNode],/** Table dynamic option result from the provider. */ @PublicEvolving static interface TableDynamicOpitons { isCompiled: Boolean): ExecNodeGraph = { /** Get source options for given table ...;identifier. */ valMap<String, processorsString> = getExecNodeGraphProcessorssourceOptions(ObjectIdentifier table); // Add dynamic parameter processor to processors /** Get sink options for given table addDynamicParameterProcessor(processors)identifier. */ processors.foldLeft(execGraph)((graphMap<String, processor) => processor.process(graph, context))String> sinkOptions(ObjectIdentifier table); } } /** Factory to create table dynamic option provider. */ @PublicEvolving public interface TableDynamicOptionProviderFactory extends Factory private def addDynamicParameterProcessor(processors: Seq[ExecNodeGraphProcessor]): Unit = { // Create dynamic parameter process from table config and add it to processors{ TableDynamicOptionProvider createProvider(Context context); /** Context for dynamic option provider factory. */ static interface Context { ReadableConfig getConfiguration(); } ClassLoader getClassLoader(); } } |
...
TableEnvironment
Add registerTableOptionProvider
to TableEnvironment
.
Code Block |
---|
/** Add register provider method in table environment. */ public interface TableEnvironment { * Getvoid source dynamic parameters from {@link SourceDynamicParameterProvider} and set them to the * sources in exec graph. registerTableOptionProvider(TableDynamicOptionProvider provider); } |
Options
Add new option for provider factory in SqlGatewayOptions
.
Code Block |
---|
sql-gateway.table-dynamic-option-provider-factory: {provider factory class identifier} |
Proposed Changes
Load Provider In Gateway
Sql-Gateway could load the provider factory and options provider from configuration for DefaultContext
, then a session is created by SessionManager
can get the provider as follows.
Code Block |
---|
/** Load the provider factory and create provider for DefaultContext. */ public class SourceDynamicParameterProcessor implements ExecNodeGraphProcessorDefaultContext { private final SourceDynamicParameterProviderTableDynamicOptionProvider providertableOptionProvider; @Override...; public static ExecNodeGraphDefaultContext process(ExecNodeGraph execGraph, ProcessorContext context) {load( Configuration dynamicConfig, // Visit allList<URL> CommonExecTableSourceScandependencies, get dynamic parameters with provider boolean discoverExecutionConfig, // and set the options to CommonExecTableSourceScan } } |
DynamicTableSourceSpec
Code Block |
---|
public class DynamicTableSourceSpec boolean discoverPythonJar) { /** ...; * Create scan table source with dynamic parameters. */ TableDynamicOptionProviderFactory publicproviderFactory ScanTableSource= getScanTableSourceFactoryUtil.discoverFactory( FlinkContext context DefaultContext.class.getClassLoader(), FlinkTypeFactory typeFactory TableDynamicOptionProviderFactory.class, Map<String, String> dynamicParameters) { dynamicConfig.get(SOURCE_DYNAMIC_OPTIONS_FACTORY)); .... TableDynamicOptionProvider optionsProvider = // Copy a resolved table with dynamic parameters. providerFactory.create(new Context() { ContextResolvedTable resolvedTable =...; dynamicParameters == null || dynamicParameters.isEmpty() }); ? contextResolvedTable ...; return : contextResolvedTable.copy(dynamicParametersnew DefaultContext(..., optionsProvider); } } |
Option Provider In TableEnvironment
There will be a registerTableOptionProvider
method in TableEnvironment
. For Sql-Gateway, it can register the loaded provider to TableEnvironment
before submitting Operation
, and for the users who don't submit jobs through Sql-Gateway, they can create and register TableDynamicOptionProvider
for the table environment themselves.TableEnvironment
will traverse the received Operation
, get all QueryOperation
and ModifyOperation
for sources and sinks, and get options for all the source and sink tables at once. After that, the options for each table will be added to the QueryOperation
and ModifyOperation
.
Code Block |
---|
public class TableEnvironmentImpl // Create table source with new resolved table.implements TableEnvironmentInternal { @Nullable private TableDynamicOptionProvider tableDynamicOptionProvider; /** Register source dynamic tableSource =option provider. */ @Override public void FactoryUtil.createDynamicTableSourceregisterTableDynamicOptionProvider( TableDynamicOptionProvider TableDynamicOptionProvider) { factory, this.TableDynamicOptionProvider = TableDynamicOptionProvider; } resolvedTable.getIdentifier(), /** Get options for source/sink tables and add them to specified resolvedTable.getResolvedTable(),operation. */ private void provideTableDynamicOptions(List<Operation> operations) { loadOptionsFromCatalogTable(resolvedTableMap<ObjectIdentifier, context), List<SourceQueryOperation>> identifierSourceOperations = traversesourceOperations(operations); TableDynamicOpitons tableDynamicOptions = contexttableDynamicOptionProvider.getTableConfiggetDynamicOptions(), new TableOptionsContext() { context.getClassLoader(),... }); // Add options in QueryOperation and resolvedTable.isTemporary());ModifyOperation. ....; } } |
...
Dynamic Options In Operation
Currently there is a field named dynamicOptions
in SourceQueryOperation
and SinkModifyOperation
which will be used to create the source and sink tables from catalogs. We can add dynamic options generated from provider to dynamicOptions
and then Flink can create tables according to these dynamic options. For the source and sink Operations
without dynamicOptions
field, we can add it to them and create RelNode
with it in converter.
Code Block |
---|
public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData>class SourceQueryOperation implements QueryOperation { private final Map<String, String> dynamicOptions ....; /** Add options to dynamicOptions if implements MultipleTransformationTranslator<RowData> {it is not existed. */ @Nullablepublic protectedvoid addDynamicOptions(Map<String, String> dynamicParameters options) { options.forEach((k, v) -> dynamicOptions.putIfAbsent(k, v)); } } public class SinkModifyOperation implements @OverrideModifyOperation { protectedprivate Transformation<RowData> translateToPlanInternal( final Map<String, String> dynamicOptions; public PlannerBase plannervoid addDynamicOptions(Map<String, ExecNodeConfigString> configoptions) { options....forEach((k, v) -> dynamicOptions.putIfAbsent(k, v)); } } |
There will be three types of options for tables: options in CREATE ... WITH(options), options from Provider and options in /*+ OPTIONS(...) */
for SQL. The priority here is the lowest for WITH
, followed by Provider, and the highest in OPTIONS.
Use Case
Generate Snapshot IDs For Paimon
All tables of a database are synchronized to Paimon such as tableA, tableB and tableC, the data in these tables are managed by an unified version which is mapping to different snapshot ids in them. We can create a PaimonSnapshotTableOptionProvider implements TableDynamicOptionProvider
to get different snapshot ids for the above tables according to the unified version in Sql-Gateway as follows.
Code Block |
---|
SET 'sql-gateway.table-dynamic-option-provider-factory'='paimon-provider'; // CreateSet tablethe sourcepaimon withprovider for Sql-Gateway SELECT * FROM tableA JOIN tableB ON ... JOIN tableC ON ...; |
In the above SELECT
query, Flink will use snapshot ids for tableA, tableB and tableC from provider to get specific snapshot data from these tables. For example, snapshot1
for tableA, snapshot3
for tableB and snapshot10
for tableC belong to the same version, they will be provided by the paimon provider for the query even when it is not configured in the sql statement.
Generate Username And Password For Jdbc
Suppose there is a Security Service stores the username and password for RDS and we can develop a JdbcTableSecurityServiceProvider
to get them for jdbc tables in job. We can write our jobs in sql as follows and submit it to Sql-Gateway
Code Block |
---|
SET 'sql-gateway.table-dynamic-option-provider-factory'='jdbc-security-provider'; // Set the jdbc security provider for Sql-Gateway CREATE TABLE my_jdbc_table dynamic parameters. final ScanTableSource tableSource = tableSourceSpec.getScanTableSource( val1 INT, val2 INT) WITH ( 'connector' planner.getFlinkContext()= 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'mysql_table' ShortcutUtils.unwrapTypeFactory(planner), //'username' = '****' We don't need to provide username and password for job dynamicParameters); here //'password' = '****' ); INSERT INTO my_jdbc_table SELECT ... FROM ...; GROUP } }BY ...; |
In the above case, we don't need to add username
and password
options in our SQL job or just give a placeholder configuration, and we can generate the real username and password for the jdbc tables in a security provider. After dba update the security information for the tables, we can just restart our jobs without any changes in jobs.
Compatibility, Deprecation, and Migration Plan
This is a newly added feature, so there will be no compatibility issues
Test Plan
UT & IT
[1] https://www.vldb.org/pvldb/vol15/p3411-chen.pdf
...