Discussion threadhttps://lists.apache.org/thread/7zyojksml5xj77fdokcpttf550qflyqm
Vote threadTBD
JIRA

TBD

Release<Flink Version>

Motivation

Currently, the application deployment mode requires jar-based job submission at JobManager, which can't be directly used by pure SQL jobs submitted by SQL Client/Gateway or Flink CLI. A common workaround is to make a simple user jar to wrap the SQLs, such as SQL Runner in Flink K8s Operator in Flink Kubernetes operator. This FLIP aims to support this approach officially by introducing SQL Driver.

SQL Driver serves mainly two use cases :

  • For SQL Client/Gateway users to run Flink SQLs in application mode.
  • For potential external systems to integrate Flink SQL.

Note that although SQL Driver is designed mainly for solving problems in the application mode, it could also be used in all deployment modes.

Basic Idea

In general, SQL Driver is responsible for executing SQLs or their compile plans (AKA JSON plan) at JobManager in the following steps:

1. build a new TableEnvironment with the given configurations
2. execute SQLs and their compiled results (Json Plan, ExecNodeGraph in JSON format)
3. exit when all SQLs are executed(either sync or async)


SQL Gateway would support 2 approaches for executing SQLs:

  • interactive execution: execute SQLs interactively in the SQL client CLI and SQL Gateway would deploy an application cluster (with SQL Driver as the entrypoint) for each DML automatically.
  • non-interactive execution: submit an SQL script file using SQL client CLI, the SQL client would deploy an application cluster (with SQL Driver as the entrypoint which starts an embedded SQL Gateway)  to execute the whole SQL script.


In the non-interactive execution, the SQL execution process is simple: ship everything needed to the cluster and start the embedded SQL Gateway to execute SQLs sequentially.

In the interactive execution, the SQL execution process is divided into two phases:

  • Phase1: parsed, optimized, and compiled at SQL Gateway
  • Phase2: executed by SQL Driver at JobManager

We have to ensure the two phases get the same environment, e.g. UDF jars, Flink libs, etc.

The most challenging parts are:

1) serialization of TableEnvironment in SQL Gateway

2) distribution of SQLs and resources in K8s application mode

3) execution delegation and result fetching of SQLs in SQL Gateway

Serialization of TableEnvironment

As described above, there are two phases in SQL execution in the SQL Gateway scenario. SQL Gateway has to prepare everything needed for SQL execution at JobManager, which normally includes the TableEnvironment and the SQLs to execute. However, serialization of TableEnvironemts is difficult as it includes catalogs, functions, and resources that are not serializable at the moment.

Fortunately, we could make use of the compiled representation, i.e. JSON plan introduced in FLIP-190: Support Version Upgrades for Table API & SQL Programs, to optimize the process. JSON plan (or its in-memory representation ExecNodeGraph) which could restore a pipeline(DAG), draws a clean line between SQLs and pipelines. 

JSON plan eliminates the need to serialize the whole TableEnvironment, because the functions and the catalogs are not accessed during Phase2. The modules and the resources (e.g. UDFs) are still needed though, but they can be easily serialized into the configurations.

Distribution of SQLs and Resources

The distribution is easy on YARN, as YARN allows shipping resources along with the application. In contrast, K8s requires users to prepare all resources needed by customizing the images. That makes trouble for SQL Gateway, since SQL Gateway has no way to put extra files in the user images.

Therefore, a distributed filesystem would be needed. We could leverage Flink's file system abstraction to store resources, similar to the HA storage directory or the checkpoint directory. These would be mostly covered in FLINK-28915 and FLINK-32315, which introduce a file distribution mechanism for Flink on K8s.

In brief, all that SQL Gateway has to do is to specify resource-related configurations (e.g `pipeline.jars`) with the local resources with scheme `file://`, Flink would take care of shipping files from the client (SQL Gateway) to the cluster (SQL Driver). 

Execution Delegation and Result Fetching

Only statements that could be compiled into JSON plans would be delegated to SQL Driver. These statements should be DMLs (insert/update/delete) and DQLs (select).

However, since the jobs are submitted by SQL Driver, by default SQL Gateway has no access to the returned ResultFetcher, thus doesn't know about the JobID and couldn't fetch results directly.

Hence, we need the following adjustments:

  1. WRT JobID, SQL gateway needs to manually generate a JobID and set it into `PIPELINE_FIXED_JOB_ID` before delegating the job to SQL Driver. We could derive JobID from ClusterID as the ApplicationDispatcherBootstrap does.
  2. WRT result fetching, we need to construct CollectResultFetcher manually with a fixed Operator ID. That's to say, CollectSinkOperatorFactory should allow an externally specified operator ID for CollectSink.

As of Flink 1.19, json plans don't support DQLs, hence we simply return OK if an application cluster is submitted successfully.

JsonPlan Limitations 

As described in FLIP-190, json plans have some limitations that also apply to the interactive execution of SQL Gateway application mode, below are the most significant ones:

  • DQLs are not supported.
  • Batch queries are not supported, because "Batch ExecNodes cannot be serialized into a JSON plan".

Public Interfaces

New Classes

Add SQL Driver in the `flink-sql-gateway` module. It acts as the main entrypoint to initialize the TableEnvironment.

SqlDriver
@PublicEvolving
public final class SqlDriver {
    public static void main(String[] args) throws Throwable {
        // 1. localize resources
        // 2. initialize the TableEnvironment based on configurations and SQL init files
        // 3. execute SQLs or JSON plans
    }
}

New Configuration Options

To support pure SQL jobs, new options would be introduced for Flink configuration.

Specifically, the following configurations:

New Configuration Options
ConfigOptions.key("$internal.sql-gateway.driver.json-plan")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "File system path (URI) of json plan that the pipeline is built from. " + 
                            "URIs of Flink-supported filesystem are allowed.");  
ConfigOptions.key("$internal.sql-gateway.driver.sql-config")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "Path of the sql related configuration that contains configuations to prepare the environment. " + 
                            "For example, function metadata. URIs of Flink-supported filesystem are allowed.");


WRT SQL Gateway, if `execution.target` is set to application mode, SQL Gateway would use SQL Driver under the hook to build a Flink application. Please see details in the OperationExecutor section below.

Proposed Changes

1. OperationExecutor

Considering only SQLs that create Flink jobs need to be executed at JobManager and the application mode doesn't support selects, we could make OperationExecutor delegate only DMLs (i.e. inserts or statement sets) execution to SQL Driver, and keep other SQLs executed locally.

In the case of DMLs in application mode, OperationExecutor would need to:

  1. convert ModifyOperation into CompilePlanOperation and then execute it to generate the JSON plan.
  2. generate configurations (e.g. `$internal.sql-gateway.driver.json-plan` and `$internal.pipeline.job-id`) for the pipeline
  3. deploy the application via ApplicationClusterDeployer. 
  4. construct a ResultFetcher for the job based on the cluster id and the derived job id, and fetch job results.

2. SQL Driver

When invoked at JobManager, SQL Driver would:

  1. localize resources if needed (mostly would be done by Flink K8s/YARN module codes).
  2. initialize the TableEnvironemt based on configurations.
  3. execute SQLs or compiled JSON plans depending on the configuration.

The whole process is the same no matter whether the pipeline is deployed directly by users or indirectly by SQL Gateway.

3. CollectSinkOperatorFactory

CollectSinkOperatorFactory would need to support externally specified IDs for CollectSink. With the constant operator ID, SQL Gateway would be able to manually create a CollectResultFetcher for job result fetching.

Compatibility, Deprecation, and Migration Plan

No compatibility issues since it's a new feature.

Test Plan

  • Add unit tests to verify new configuration options' parsing.
  • Add E2E tests to verify application mode with Flink SQL.
  • Add unit tests to SQL Gateway for the application mode with SQLs.

Rejected Alternatives

1. JobGraph-based submission like the deprecated per-job mode

Pros:

  • Requires little effort based on per-job mode.
  • Avoid rebuilding the same table environment with SQL gateway at JobManager.
  • Friendly for the existing per-job mode users.

Cons:

  • Per-job mode is deprecated in Flink 1.15.
  • Jobgraph is an internal serializable, not suitable for application mode in which users may need to prepare the job serializable (jar/sql/jobgraph etc).

2. Web submission like the session mode

Pros:

  • Requires relatively little effort.

Cons:

  • Divides the job deployment into two steps, which is not appropriate for application mode.

3. Support direct SQL submission at JobManager

Pros:

  • Simple architecture, similar to databases.

Cons:

  • Put more responsibilities and load on JobManager.
  • There's already an SQL Gateway to interpret SQLs into JobManager-supported serializables.
  • No labels