Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhttps://lists.apache.org/thread/7zyojksml5xj77fdokcpttf550qflyqm
Vote threadTBD
JIRA

TBD

Release<Flink Version>

Table of Contents

Motivation

Currently, the application deployment mode requires jar-based job submission at JobManager, which can't be directly used by pure SQL jobs submitted by SQL Client/Gateway or Flink CLI. A common workaround is to make a simple user jar to wrap the SQLs, such as SQL Runner in Flink K8s Operator in Flink Kubernetes operator. This FLIP aims to support this approach officially by introducing SQL Driver.

...

Note that although SQL Driver is designed mainly for solving problems in the application mode, it is could also be also used in all deployment modes.

Basic Idea

In general, SQL Driver is responsible for executing SQLs or their compile SQL plans (AKA JSON plan) at JobManager in the following steps:

1. build a new TableEnvironment with the given configurations
2. execute SQL SQLs and their compiled results (Json Plan, ExecNodeGraph in JSON format)
3. exit when all SQL plans SQLs are executed(either sync or async)


SQL Gateway would support 2 approaches for executing SQLs:

  • interactive execution: execute SQLs interactively in the SQL client CLI and SQL Gateway would deploy an application cluster (with SQL Driver as the entrypoint) for each DML automatically.
  • non-interactive execution: submit an SQL script file using SQL client CLI, the SQL client would deploy an application cluster (with SQL Driver as the entrypoint which starts an embedded SQL Gateway)  to execute the whole SQL script.


In the non-interactive execution, the SQL execution process is simple: ship everything needed to the cluster and start the embedded SQL Gateway to execute SQLs sequentially.

In the interactive execution SQL Gateway scenarios, the SQL execution process is divided into two phases:

...

We have to ensure the two phases get the same environment, e.g. UDF jars, Flink libs, etc.

Image RemovedImage Added

Image RemovedImage Added

The most challenging parts are:

...

3) execution delegation and result fetching of SQLs in SQL Gateway

Serialization of TableEnvironment

As described above, there are two phases in SQL execution in the SQL Gateway scenario. SQL Gateway has to prepare everything needed for SQL execution at JobManager, which normally includes the TableEnvironment and the SQLs to execute. However, serialization of TableEnvironemts is difficult as it includes catalogs, functions, and resources that are not serializable at the moment.

...

JSON plan eliminates the need to serialize the whole TableEnvironment, because the functions and the catalogs are not accessed during Phase2. The modules and the resources (e.g. UDFs) are still needed though, but they can be easily serialized into the configurations.

Distribution of SQLs and Resources

The distribution is easy on YARN, as YARN allows shipping resources along with the application. In contrast, K8s requires users to prepare all resources needed by customizing the images. That makes trouble for SQL Gateway, since SQL Gateway has no way to put extra files in the user images.

...

In brief, all that SQL Gateway has to do is to specify resource-related configurations (e.g `pipeline.jars`) with the local resources with scheme `file://`, Flink would take care of shipping files from the client (SQL Gateway) to the cluster (SQL Driver). 

Execution Delegation and Result Fetching

Only statements that could be compiled into JSON plans would be delegated to SQL Driver. These statements are should be DMLs (insert/update/delete) and DQLs (select).

However, since the jobs are submitted by SQL Driver, by default SQL Gateway has no access to the returned ResultFetcher, thus doesn't know about the JobID and couldn't fetch results directly.

Hence, we need the following adjustments:

  1. WRT JobID, SQL gateway needs to manually generate a JobID and set it into `PIPELINE_FIXED_JOB_ID` before delegating the job to SQL Driver. We could derive JobID from ClusterID as the ApplicationDispatcherBootstrap does.
  2. WRT result fetching, we need to construct CollectResultFetcher manually with a fixed Operator ID. That's to say, CollectSinkOperatorFactory should allow an externally specified operator ID for CollectSink.

As of Flink 1.19, json plans don't support DQLs, hence we simply return OK if an application cluster is submitted successfully.

JsonPlan Limitations 

As described in FLIP-190, json plans have some limitations that also apply to the interactive execution of SQL Gateway application mode, below are the most significant ones:

  • DQLs are not supported.
  • Batch queries are not supported, because "Batch ExecNodes cannot be serialized into a JSON plan".

Public Interfaces

New Classes

Add SQL Driver in the `flink-sql-gateway` module. It acts as the main entrypoint to initialize the TableEnvironment.

Code Block
languagejava
titleSqlDriver
@PublicEvolving
public final class SqlDriver {
    public static void main(String[] args) throws Throwable {
        // 1. localize resources
        // 2. initialize the TableEnvironment based on configurations and SQL init files
        // 3. execute SQLs or JSON plans
    }
}

New Configuration Options

To support pure SQL jobs, new options would be introduced for Flink configuration.

...

WRT SQL Gateway, if `execution.target` is set to application mode, SQL Gateway would use SQL Driver under the hook to build a Flink application. Please see details in the OperationExecutor section below.

Proposed Changes

1. OperationExecutor

Considering only SQLs that create Flink jobs need to be executed at JobManager and the application mode doesn't support selects, we could make OperationExecutor delegate only DMLs (i.e. inserts or statement sets) execution to SQL Driver, and keep other SQLs executed locally.

...

  1. convert ModifyOperation into CompilePlanOperation and then execute it to generate the JSON plan.
  2. generate configurations (e.g. `$internal.sql-gateway.driver.json-plan` and `$internal.pipeline.job-id`) for the pipeline
  3. deploy the application via ApplicationClusterDeployer. 
  4. construct a ResultFetcher for the job based on the cluster id and the derived job id, and fetch job results.

2. SQL Driver

When invoked at JobManager, SQL Driver would:

...

The whole process is the same no matter whether the pipeline is deployed directly by users or indirectly by SQL Gateway.

3. CollectSinkOperatorFactory

CollectSinkOperatorFactory would need to support externally specified IDs for CollectSink. With the constant operator ID, SQL Gateway would be able to manually create a CollectResultFetcher for job result fetching.

Compatibility, Deprecation, and Migration Plan

No compatibility issues since it's a new feature.

Test Plan

  • Add unit tests to verify new configuration options' parsing.
  • Add E2E tests to verify application mode with Flink SQL.
  • Add unit tests to SQL Gateway for the application mode with SQLs.

Rejected Alternatives

1. JobGraph-based submission like the deprecated per-job mode

Pros:

  • Requires little effort based on per-job mode.
  • Avoid rebuilding the same table environment with SQL gateway at JobManager.
  • Friendly for the existing per-job mode users.

...

  • Per-job mode is deprecated in Flink 1.15.
  • Jobgraph is an internal serializable, not suitable for application mode in which users may need to prepare the job serializable (jar/sql/jobgraph etc).

2. Web submission like the session mode

Pros:

  • Requires relatively little effort.

...

  • Divides the job deployment into two steps, which is not appropriate for application mode.

3. Support direct SQL submission at JobManager

Pros:

  • Simple architecture, similar to databases.

...