Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. WRT JobID, SQL gateway needs to manually generate a JobID and set it into `PIPELINE_FIXED_JOB_ID` before delegating the job to SQL Driver. We could derive JobID from ClusterID as the ApplicationDispatcherBootstrap does.
  2. WRT result fetching, we need to make CollectSinkFunction support remote socket addressesconstruct CollectResultFetcher manually with a fixed Operator ID. That's to say, start a socket server thread at SQL Gateway and pass its address to the SQL Driver. When CollectSinkFunction is invoked, it connects to the socket server at SQL Gateway, instead of SQL Driver (JobManger)CollectSinkOperatorFactory should allow an externally specified operator ID for CollectSink.

Public Interfaces

New Classes

...

Code Block
languagejava
titleNew Configuration Options
ConfigOptions.key("sql-gateway.applications.storage-dir")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "File system path (URI) of the base directory for SQL gateway generated resources, which must be accessible from the cluster. " +
                            "URIs of Flink-supported filesystem are allowed. " +
                            "When initialing,the resources woulbe be localized into the working directory of SQL Driver. "
                            "This option is not effective for YARN setups, since YARN distributes resource files natively.");
ConfigOptions.key("$internal.sql-gateway.driver.json-plan")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "File system path (URI) of json plan that the pipeline is built from. " + 
                            "URIs of Flink-supported filesystem are allowed.");  
ConfigOptions.key("$internal.sql-gateway.driver.sql-config")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "Path of the sql related configuration that contains configuations to prepare the environment. " + 
                            "For example, function metadata. URIs of Flink-supported filesystem are allowed.");

...

  1. convert ModifyOperation into CompilePlanOperation and then execute it to generate the JSON plan.
  2. generate JobID based on ClusterID and use it to construct a ResultFetcher for the job.
  3. put the JSON plan and other resources into the `sql-driver.storage-dir` on K8s or registered them as application resources on YARN.
  4. generate configurations (e.g. `$internal. `sqlsql-gateway.driver.json-plan` and `$internal.pipeline.job-id`) for the pipeline and
  5. deploy the application via ApplicationClusterDeployer. 
  6. construct a ResultFetcher for the job based on the cluster id and the derived job id, and fetch job results.

2. SQL Driver

When invoked at JobManager, SQL Driver would:

...

The whole process is the same no matter whether the pipeline is deployed directly by users or indirectly by SQL Gateway.

3. CollectSinkOperatorFactory

CollectSinkOperatorFactory would need to support externally specified IDs for CollectSink. With the constant operator ID, SQL Gateway would be able to manually create a CollectResultFetcher for job result fetching.

Compatibility, Deprecation, and Migration Plan

...

  • Per-job mode is deprecated in Flink 1.15.
  • Jobgraph is an internal serializable, not suitable for application mode in which users may need to prepare the job serializable (jar/sql/jobgraph etc).

2. Web submission like the session mode

Pros:

  • Requires relatively little effort.

Cons:

  • Divides the job deployment into two steps, which is not appropriate for application mode.

3. Support SQL submission at JobManager

Pros:

  • Simple architecture, similar to databases.

...