Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhttps://lists.apache.org/thread/7zyojksml5xj77fdokcpttf550qflyqm
Vote threadTBD
JIRA

TBD

Release<Flink Version>

Table of Contents

Motivation

Currently, the application deployment mode requires jar-based job submission at JobManager, which can't be directly used by pure SQL jobs submitted by SQL Client/Gateway or Flink CLI. A common workaround is to make a simple user jar to wrap the SQLs, such as SQL Runner in Flink K8s Operator in Flink Kuberne,tes Kubernetes operator. This FLIP aims to support this approach officially by introducing SQL Driver.

SQL Driver , as the default main class for Flink SQL, serves mainly two use cases :

  • For SQL Client/Gateway users to run Flink SQLs in application mode.
  • For non-SQL Client/Gateway users to run Flink SQLs without needing to write a Java programpotential external systems to integrate Flink SQL.

Note that although SQL Driver is designed mainly for solving the problems in the application mode, it is general-purposed and could also be also used in all deployment modes.

Basic Idea

In general, SQL Driver behaves like SQL Client in non-interactive mode. It mainly executes SQLs is responsible for executing SQLs or their compile plans (AKA JSON plan) at JobManager in the following steps:

1. build a new TableEnvironment with the given configurations and an optional SQL init file
2. execute SQLs or and their compiled results (Json Plan, ExecNodeGraph in JSON format)
3. exit when all SQLs are executed(either sync or async)

The usage in non-SQL Gateway scenarios is relatively simple as it involves only two components, the client and the cluster manager. The deployment process is the same as now it's.

For both YARN/K8s setups, the deployment process is the same as it's now:

Image RemovedImage Removed


SQL Gateway would support 2 approaches for executing SQLs:

  • interactive execution: execute SQLs interactively in the SQL client CLI and SQL Gateway would deploy an application cluster (with SQL Driver as the entrypoint) for each DML automatically.
  • non-interactive execution: submit an SQL script file using SQL client CLI, the SQL client would deploy an application cluster (with SQL Driver as the entrypoint which starts an embedded SQL Gateway)  to execute the whole SQL script.


In the non-interactive execution, the SQL execution process is simple: ship everything needed to the cluster and start the embedded SQL Gateway to execute SQLs sequentially.

In the interactive execution, However, the deployment would be much more complex in SQL Gateway scenarios, because the SQL execution process is divided into two phases:

...

We have to ensure the two phases get the same environment, e.g. UDF jars, Flink libs, etc.

Image RemovedImage Added

Image RemovedImage Added

The most challenging parts are the :

1) serialization of TableEnvironment in SQL Gateway

2) and the distribution of SQLs and resources in K8s application mode, which is explained in detail in the following sections.

3) execution delegation and result fetching of SQLs in SQL Gateway

Serialization of TableEnvironment

As described above, there are two phases in SQL execution in the SQL Gateway scenario. SQL Gateway has to prepare everything needed for SQL execution at JobManager, which normally includes the TableEnvironment and the SQLs to execute. However, serialization of TableEnvironemts is difficult as it includes catalogs, functions, and resources that are not serializable at the moment.

...

JSON plan eliminates the need to serialize the whole TableEnvironment, because the functions and the catalogs are not accessed during Phase2. The modules and the resources (e.g. UDFs) are still needed though, but they can be easily serialized into the configurations.

Distribution of SQLs and Resources

The distribution is easy on YARN, as YARN allows shipping resources along with the application. In contrast, K8s requires users to prepare all resources needed by customizing the images. That makes trouble for SQL Gateway, since SQL Gateway has no way to put extra files in the user images.

Therefore, a distributed filesystem would be introducedneeded. We could leverage Flink's file system abstraction to store resources, similar to the HA storage directory or the checkpoint directory. In K8s application mode, SQL Gateway would be responsible to put all the job resources These would be mostly covered in FLINK-28915 and FLINK-32315, which introduce a file distribution mechanism for Flink on K8s.

In brief, all that SQL Gateway has to do is to specify resource-related configurations (e.g . Json plans, UDF jars) into the directory, and SQL Driver is responsible to pull it to the local filesystem before initiating a TableEnvironment`pipeline.jars`) with the local resources with scheme `file://`, Flink would take care of shipping files from the client (SQL Gateway) to the cluster (SQL Driver). 

Execution Delegation and Result Fetching

Only statements that could be compiled into JSON plans would be delegated to SQL Driver. These statements should be DMLs (insert/update/delete) and DQLs (select).

However, since the jobs are submitted by SQL Driver, by default SQL Gateway has no access to the returned ResultFetcher, thus doesn't know about the JobID and couldn't fetch results directly.

Hence, we need the following adjustments:

  1. WRT JobID, SQL gateway needs to manually generate a JobID and set it into `PIPELINE_FIXED_JOB_ID` before delegating the job to SQL Driver. We could derive JobID from ClusterID as the ApplicationDispatcherBootstrap does.
  2. WRT result fetching, we need to construct CollectResultFetcher manually with a fixed Operator ID. That's to say, CollectSinkOperatorFactory should allow an externally specified operator ID for CollectSink.

As of Flink 1.19, json plans don't support DQLs, hence we simply return OK if an application cluster is submitted successfully.

JsonPlan Limitations 

As described in FLIP-190, json plans have some limitations that also apply to the interactive execution of SQL Gateway application mode, below are the most significant ones:

  • DQLs are not supported.
  • Batch queries are not supported, because "Batch ExecNodes cannot be serialized into a JSON plan".

Public Interfaces

New Classes

Add SQL Driver in the flink`flink-sql-client gateway` module. It acts as the main entrypoint to initialize the TableEnvironment.

Code Block
languagejava
titleSqlDriver
@PublicEvolving
public final class SqlDriver {
    public static void main(String[] args) throws Throwable {
        // 1. localize resources
        // 2. initialize the TableEnvironment based on configurations and SQL init files
        // 3. execute SQLs or JSON plans
    }
}

New Configuration Options

To support pure SQL jobs, new options would be introduced for Flink configuration.

...

Code Block
languagejava
titleNew Configuration Options
ConfigOptions.key("sql-driver.file")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "File system path (URI) of file that contains SQLs to execute. " +
                            "URIs of Flink-supported filesystem are allowed. " + 
                            "Either `sql.file` or `sql.json-plan` should be specified.");
ConfigOptions.key("sql-$internal.sql-gateway.driver.json-plan")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "File system path (URI) of json plan that the pipeline is built from. " + 
                            "URIs of Flink-supported filesystem are allowed. " +
                            "Either `sql.file` or `sql.json-plan` should be specified.");  
ConfigOptions.key("$internal.sql-gateway.driver.init.filesql-config")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "Path of the localsql related fileconfiguration that contains initialconfiguations SQLs to prepare the environment. " + 
                            "For example, create catalogs and functionsfunction metadata. URIs of Flink-supported filesystem are allowed.");

With these options, a SQL application could be run with the following command through CLI:

Code Block
languagebash
titleFlink CLI example
 ./bin/flink run-application -t kubenetes-application \
      -Dsql-driver.init.file hdfs:///tmp/init_sql \
      -Dsql-driver.file file:///tmp/sql \
      -Dpipeline.jars=s3://mybucket.s3.example.com/myudf.jar 


WRT SQL Gateway, if `execution.target` is set to application mode, SQL Gateway would use SQL Driver under the hook to build a Flink application. New SQL gateway options would be needed.

...

languagejava
titleNew Configuration Options

...

Please see details in the OperationExecutor section below.

Proposed Changes

1. OperationExecutor

Considering only SQLs that create Flink jobs need to be executed at JobManager and the application mode doesn't support selects, we could make OperationExecutor delegate only DMLs (i.e. inserts or statement sets) execution to SQL Driver, and keep other SQLs executed locally.

...

  1. convert ModifyOperation into CompilePlanOperation and then execute it to generate the JSON plan.
  2. put the JSON plan and other resources either into the `sql-gateway.application.storage-dir` on K8s or registered as application resources on YARN.
  3. generate configurations (e.g. `sql. `$internal.sql-gateway.driver.json-plan` and `$internal.applicationpipeline.main`job-id`) for the pipeline and
  4. deploy the application via ApplicationClusterDeployer. 

...

  1. construct a ResultFetcher for the job based on the cluster id and the derived job id, and fetch job results.

2. SQL Driver

When invoked at JobManager, SQL Driver would:

  1. localize resources if needed (only needed for K8smostly would be done by Flink K8s/YARN module codes).
  2. initialize the TableEnvironemt based on configurations and an optional SQL init file.
  3. execute SQLs or compiled JSON plans depending on the configuration.

The whole process is the same no matter whether the pipeline is deployed directly by users or indirectly by SQL Gateway.

3. CollectSinkOperatorFactory

CollectSinkOperatorFactory would need to support externally specified IDs for CollectSink. With the constant operator ID, SQL Gateway would be able to manually create a CollectResultFetcher for job result fetching.

Compatibility, Deprecation, and Migration Plan

No compatibility issues since it's a new feature.

Test Plan

  • Add unit tests to verify new configuration options' parsing.
  • Add E2E tests to verify application mode with Flink SQL.
  • Add unit tests to SQL Gateway for the application mode with SQLs.

Rejected Alternatives

1. JobGraph-based submission like the deprecated per-job mode

Pros:

  • Requires little effort based on per-job mode.
  • Avoid rebuilding the same table environment with SQL gateway at JobManager.
  • Friendly for the existing per-job mode users.

...

  • Per-job mode is deprecated in Flink 1.15.
  • Jobgraph is an internal serializable, not suitable for application mode in which users may need to prepare the job serializable (jar/sql/jobgraph etc).

2. Web submission like the session mode

Pros:

  • Requires relatively little effort.

Cons:

  • Divides the job deployment into two steps, which is not appropriate for application mode.

3. Support direct SQL submission at JobManager

Pros:

  • Simple architecture, similar to databases.

...