Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleNew Configuration Options
ConfigOptions.key("sql-driver.file")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "File system path (URI) of file that contains SQLs to execute. " +
                            "URIs of Flink-supported filesystem are allowed. " + 
                            "Either `sql-driver.file` or `sql-driver.json-plan` should be specified.");
ConfigOptions.key("sql-driver.json-plan")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "File system path (URI) of json plan that the pipeline is built from. " + 
                            "URIs of Flink-supported filesystem are allowed. " +
                            "Either `sql-driver.file` or `sql-driver.json-plan` should be specified.");  
ConfigOptions.key("sql-driver.init.file")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "Path of the local file that contains initial SQLs to prepare the environment. " + 
                            "For example, create catalogs and functions. URIs of Flink-supported filesystem are allowed.");

With these options, a SQL application could be run with the following command through CLI:

Code Block
languagebash
titleFlink CLI example
 ./bin/flink run-application -t kubenetes-application \
      -Dsql-driver.init.file hdfs:///tmp/init_sql \
      -Dsql-driver.file file:///tmp/sql \
      -Dpipeline.jars=s3://mybucket.s3.example.com/myudf.jar 

WRT SQL Gateway, if `execution.target` is set to application mode, SQL Gateway would use SQL Driver under the hook to build a Flink application. New SQL gateway options would be needed.

Code Block
languagejava
titleNew Configuration Options
ConfigOptions.key("sql-driver.storage-dir")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "File system path (URI) of the directory that stores resources. URIs of Flink-supported filesystem are allowed. " +
                            "When initialing, SQL Driver would localize the resources into the working directory. "
                            "This option is not effective for YARN setups, since YARN distributes resource files natively.");


With these options, a SQL application could be run with the following command through CLI:

Code Block
languagebash
titleFlink CLI example
 ./bin/flink run-application -t kubenetes-application \
      -Dsql-driver.init.file hdfs:///tmp/init_sql \
      -Dsql-driver.file file:///tmp/sql \
      -Dpipeline.jars=s3://mybucket.s3.example.com/myudf.jar 


WRT SQL Gateway, if `execution.target` is set to application mode, SQL Gateway would use SQL Driver under the hook to build a Flink application. Please see details in the OperationExecutor section below.

Proposed Changes

1. OperationExecutor

...

  1. convert ModifyOperation into CompilePlanOperation and then execute it to generate the JSON plan.
  2. put the JSON plan and other resources either into the `sql-gateway.applicationdriver.storage-dir` on K8s or registered as application resources on YARN.
  3. generate configurations (e.g. `sql-driver.json-plan` and `$internal.application.main`) for the pipeline and deploy the application via ApplicationClusterDeployer. 

Limitation:

1. as However, since the deployer runs at JobManager, SQL Gateway can't get the ResultFetcher which contains the job id, thus an empty job id is returned.

2. for the same reason, select queries are not supported, because ResultFetcher is not accessible from SQL Gateway

2. SQL Driver

When invoked at JobManager, SQL Driver would:

...