You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

Discussion threadhttps://lists.apache.org/thread/7zyojksml5xj77fdokcpttf550qflyqm
Vote threadTBD
JIRA

TBD

Release<Flink Version>

Motivation

Currently, the application deployment mode requires jar-based job submission at JobManager, which can't be directly used by pure SQL jobs submitted by SQL Client/Gateway or Flink CLI. A common workaround is to make a simple user jar to wrap the SQLs, such as SQL Runner in Flink K8s Operator in Flink Kuberne,tes operator. This FLIP aims to support this approach officially by introducing SQL Driver.

SQL Driver, as the default main class for Flink SQL, serves mainly two use cases :

  • For SQL Client/Gateway users to run Flink SQLs in application mode.
  • For non-SQL Client/Gateway users to run Flink SQLs without needing to write a Java program.

Note that although SQL Driver is designed mainly for solving the problems in the application mode, it is general-purposed and could be also used in all deployment modes.

Basic Idea

In general, SQL Driver behaves like SQL Client in non-interactive mode. It mainly executes SQLs at JobManager in the following steps:

1. build a new TableEnvironment with the given configurations and an optional SQL init file
2. execute SQLs or their compiled results (ExecNodeGraph in JSON format)
3. exit when all SQLs are executed(either sync or async)

The usage in non-SQL Gateway scenarios is relatively simple as it involves only two components, the client and the cluster manager. The deployment process is the same as now it's.

For both YARN/K8s setups, the deployment process is the same as it's now:

However, the deployment would be much more complex in SQL Gateway scenarios, because the SQL execution process is divided into two phases:

  • Phase1: parsed, optimized, and compiled at SQL Gateway
  • Phase2: executed by SQL Driver at JobManager

We have to ensure the two phases get the same environment, e.g. UDF jars, Flink libs, etc.

The most challenging parts are the serialization of TableEnvironment in SQL Gateway and the distribution of SQLs and resources in K8s application mode, which is explained in detail in the following sections.

Serialization of TableEnvironment

As described above, there are two phases in SQL execution in the SQL Gateway scenario. SQL Gateway has to prepare everything needed for SQL execution at JobManager, which normally includes the TableEnvironment and the SQLs to execute. However, serialization of TableEnvironemts is difficult as it includes catalogs, functions, and resources that are not serializable at the moment.

Fortunately, we could make use of the compiled representation, i.e. JSON plan introduced in FLIP-190: Support Version Upgrades for Table API & SQL Programs, to optimize the process. JSON plan (or its in-memory representation ExecNodeGraph) which could restore a pipeline(DAG), draws a clean line between SQLs and pipelines. 

JSON plan eliminates the need to serialize the whole TableEnvironment, because the functions and the catalogs are not accessed during Phase2. The modules and the resources (e.g. UDFs) are still needed though, but they can be easily serialized into the configurations.

Distribution of SQLs and Resources

The distribution is easy on YARN, as YARN allows shipping resources along with the application. In contrast, K8s requires users to prepare all resources needed by customizing the images. That makes trouble for SQL Gateway, since SQL Gateway has no way to put extra files in the user images.

Therefore, a distributed filesystem would be introduced. We could leverage Flink's file system abstraction to store resources, similar to the HA storage directory or the checkpoint directory. In K8s application mode, SQL Gateway would be responsible to put all the job resources (e.g. Json plans, UDF jars) into the directory, and SQL Driver is responsible to pull it to the local filesystem before initiating a TableEnvironment.

Public Interfaces

New Classes

Add SQL Driver in the flink-sql-client module. It acts as the main entrypoint to initialize the TableEnvironment.

SqlDriver
@PublicEvolving
public final class SqlDriver {
    public static void main(String[] args) throws Throwable {
        // 1. localize resources
        // 2. initialize the TableEnvironment based on configurations and SQL init files
        // 3. execute SQLs or JSON plans
    }
}

New Configuration Options

To support pure SQL jobs, new options would be introduced for Flink configuration.

Specifically, the following configurations:

New Configuration Options
ConfigOptions.key("sql-driver.file")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "File system path (URI) of file that contains SQLs to execute. " +
                            "URIs of Flink-supported filesystem are allowed. " + 
                            "Either `sql.file` or `sql.json-plan` should be specified.");
ConfigOptions.key("sql-driver.json-plan")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "File system path (URI) of json plan that the pipeline is built from. " + 
                            "URIs of Flink-supported filesystem are allowed. " +
                            "Either `sql.file` or `sql.json-plan` should be specified.");  
ConfigOptions.key("sql-driver.init.file")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "Path of the local file that contains initial SQLs to prepare the environment. " + 
                            "For example, create catalogs and functions. URIs of Flink-supported filesystem are allowed.");


With these options, a SQL application could be run with the following command through CLI:

Flink CLI example
 ./bin/flink run-application -t kubenetes-application \
      -Dsql-driver.init.file hdfs:///tmp/init_sql \
      -Dsql-driver.file file:///tmp/sql \
      -Dpipeline.jars=s3://mybucket.s3.example.com/myudf.jar 


WRT SQL Gateway, if `execution.target` is set to application mode, SQL Gateway would use SQL Driver under the hook to build a Flink application. New SQL gateway options would be needed.

New Configuration Options
ConfigOptions.key("sql-driver.application.storage-dir")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "File system path (URI) of the directory that stores resources in application mode. " +
                            "This option is effective only in `kubernetes-application` mode, since YARN provides the directories already.");

Proposed Changes

1. OperationExecutor

Considering only SQLs that create Flink jobs need to be executed at JobManager and the application mode doesn't support selects, we could make OperationExecutor delegate only DMLs (i.e. inserts or statement sets) execution to SQL Driver, and keep other SQLs executed locally.

In the case of DMLs in application mode, OperationExecutor would need to:

  1. convert ModifyOperation into CompilePlanOperation and then execute it to generate the JSON plan.
  2. put the JSON plan and other resources either into the `sql-gateway.application.storage-dir` on K8s or registered as application resources on YARN.
  3. generate configurations (e.g. `sql.json-plan` and `$internal.application.main`) for the pipeline and deploy the application via ApplicationClusterDeployer. 

However, since the deployer runs at JobManager, SQL Gateway can't get the job id, thus an empty job id is returned.

2. SQL Driver

When invoked at JobManager, SQL Driver would:

  1. localize resources if needed (only needed for K8s).
  2. initialize the TableEnvironemt based on configurations and an optional SQL init file.
  3. execute SQLs or compiled JSON plans depending on the configuration.

The whole process is the same no matter whether the pipeline is deployed directly by users or indirectly by SQL Gateway.

Compatibility, Deprecation, and Migration Plan

No compatibility issues since it's a new feature.

Test Plan

  • Add unit tests to verify new configuration options' parsing.
  • Add E2E tests to verify application mode with Flink SQL.
  • Add unit tests to SQL Gateway for the application mode with SQLs.

Rejected Alternatives

1. JobGraph-based submission like the deprecated per-job mode

Pros:

  • Requires little effort based on per-job mode.
  • Avoid rebuilding the same table environment with SQL gateway at JobManager.
  • Friendly for the existing per-job mode users.

Cons:

  • Per-job mode is deprecated in Flink 1.15.
  • Jobgraph is an internal serializable, not suitable for application mode in which users may need to prepare the job serializable (jar/sql/jobgraph etc).

2. Support SQL submission at JobManager

Pros:

  • Simple architecture, similar to databases.

Cons:

  • Put more responsibilities and load on JobManager.
  • There's already an SQL Gateway to interpret SQLs into JobManager-supported serializables.
  • No labels