Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update the design about file distribution and job result fetching

...

Currently, the application deployment mode requires jar-based job submission at JobManager, which can't be directly used by pure SQL jobs submitted by SQL Client/Gateway or Flink CLI. A common workaround is to make a simple user jar to wrap the SQLs, such as SQL Runner in Flink K8s Operator in Flink Kuberne,tes Kubernetes operator. This FLIP aims to support this approach officially by introducing SQL Driver.

SQL Driver , as the default main class for Flink SQL, serves mainly two use cases :

  • For SQL Client/Gateway users to run Flink SQLs in application mode.
  • For non-SQL Client/Gateway users to run Flink SQLs without needing to write a Java programpotential external systems to integrate Flink SQL.

Note that although SQL Driver is designed mainly for solving the problems in the application mode, it is general-purposed and could be also used in all deployment modes.

Basic Idea

In general, SQL Driver behaves like SQL Client in non-interactive mode. It mainly executes SQLs is responsible for executing compile SQL plans (AKA JSON plan) at JobManager in the following steps:

1. build a new TableEnvironment with the given configurations and an optional SQL init file
2. execute SQLs or their SQL compiled results (Json Plan, ExecNodeGraph in JSON format)
3. exit when all SQLs SQL plans are executed(either sync or async)


The usage in non-In SQL Gateway scenarios is relatively simple as it involves only two components, the client and the cluster manager. The deployment process is the same as now it's.For both YARN/K8s setups, the deployment process is the same as it's now:

Image RemovedImage Removed

However, the deployment would be much more complex in SQL Gateway scenarios, because the SQL execution process is divided into two phases:

...

The most challenging parts are the :

1) serialization of TableEnvironment in SQL Gateway

2) and the distribution of SQLs and resources in K8s application mode, which is explained in detail in the following sections.

3) execution delegation and result fetching of SQLs in SQL Gateway

Serialization of TableEnvironment

...

Therefore, a distributed filesystem would be introducedneeded. We could leverage Flink's file system abstraction to store resources, similar to the HA storage directory or the checkpoint directory. In K8s application mode, SQL Gateway would be responsible to put all the job resources These would be mostly covered in FLINK-28915 and FLINK-32315, which introduce a file distribution mechanism for Flink on K8s.

In brief, all that SQL Gateway has to do is to specify resource-related configurations (e.g . Json plans, UDF jars) into the directory, and SQL Driver is responsible to pull it to the local filesystem before initiating a TableEnvironment`pipeline.jars`) with the local resources with scheme `file://`, Flink would take care of shipping files from the client (SQL Gateway) to the cluster (SQL Driver). 

Execution Delegation and Result Fetching

Only statements that could be compiled into JSON plans would be delegated to SQL Driver. These statements are DMLs (insert/update/delete) and DQLs (select). However, since the jobs are submitted by SQL Driver, by default SQL Gateway has no access to the returned ResultFetcher, thus doesn't know about the JobID and couldn't fetch results directly.

Hence, we need the following adjustments:

  1. WRT JobID, SQL gateway needs to manually generate a JobID and set it into `PIPELINE_FIXED_JOB_ID` before delegating the job to SQL Driver. We could derive JobID from ClusterID as the ApplicationDispatcherBootstrap does.
  2. WRT result fetching, we need to make CollectSinkFunction support remote socket addresses. That's to say, start a socket server thread at SQL Gateway and pass its address to the SQL Driver. When CollectSinkFunction is invoked, it connects to the socket server at SQL Gateway, instead of SQL Driver (JobManger).

Public Interfaces

New Classes

Add SQL Driver in the flink`flink-sql-client gateway` module. It acts as the main entrypoint to initialize the TableEnvironment.

...

Code Block
languagejava
titleNew Configuration Options
ConfigOptions.key("sql-drivergateway.applications.sqlstorage-filedir")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "File system path (URI) of file that contains SQLs to execute the base directory for SQL gateway generated resources, which must be accessible from the cluster. " +
                            "URIs of Flink-supported filesystem are allowed. " + 
                            "Either `sql-driver.sql-file` or `sql-driver.json-plan` should be specified.");
ConfigOptions.key("sql-driver.json-plan")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "File system path (URI) of json plan that the pipeline is built from. " + When initialing,the resources woulbe be localized into the working directory of SQL Driver. "
                            "URIsThis ofoption Flink-supportedis filesystemnot areeffective allowed.for "YARN +
setups,  since  YARN  distributes  resource                    "Either `sql-driver.sql-file` or `sql-driver.json-plan` should be specified.files natively.");  
ConfigOptions.key("$internal.sql-gateway.driver.initjson-fileplan")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "PathFile ofsystem thepath local(URI) fileof thatjson containsplan initialthat SQLsthe topipeline prepareis thebuilt environmentfrom. " + 
                            "For example, create catalogs and functions. URIs of Flink-supported filesystem are allowed.");  
ConfigOptions.key("$internal.sql-gateway.driver.storagesql-dirconfig")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                             "File system path (URI) Path of the directorysql thatrelated storesconfiguration resources.that URIscontains ofconfiguations Flink-supportedto filesystemprepare arethe allowedenvironment. " + 
                            "WhenFor initialingexample, SQL Driver would localize the resources into the working directory. "
                            "This option is not effective for YARN setups, since YARN distributes resource files natively.");

With these options, a SQL application could be run with the following command through CLI:

Code Block
languagebash
titleFlink CLI example
 ./bin/flink run-application -t kubenetes-application \
      -Dsql-driver.init-file hdfs:///tmp/init_sql \
      -Dsql-driver.sql-file file:///tmp/sql \
      -Dpipeline.jars=s3://mybucket.s3.example.com/myudf.jar function metadata. URIs of Flink-supported filesystem are allowed.");


WRT SQL Gateway, if `execution.target` is set to application mode, SQL Gateway would use SQL Driver under the hook to build a Flink application. Please see details in the OperationExecutor section below.

...

  1. convert ModifyOperation into CompilePlanOperation and then execute it to generate the JSON plan.
  2. generate JobID based on ClusterID and use it to construct a ResultFetcher for the job.
  3. put the JSON plan and other resources either into the `sql-driver.storage-dir` on K8s or registered them as application resources on YARN.
  4. generate configurations (e.g. `sql-driver.json-plan` and `$internal.applicationpipeline.main`job-id`) for the pipeline and deploy the application via ApplicationClusterDeployer. 

...

1. as the deployer runs at JobManager, SQL Gateway can't get the ResultFetcher which contains the job id, thus an empty job id is returned.

2.

...

SQL

...

2. SQL Driver

When invoked at JobManager, SQL Driver would:

  1. localize resources if needed (only needed for K8smostly would be done by Flink K8s/YARN module codes).
  2. initialize the TableEnvironemt based on configurations and an optional SQL init file.
  3. execute SQLs or compiled JSON plans depending on the configuration.

...