Motivation
Currently, the application deployment mode requires jar-based job submission at JobManager, which can't be directly used by pure SQL jobs submitted by SQL Client and SQL Gateway. A common workaround is to make a simple user jar to wrap the SQLs, such as SQL Runner in Flink K8s Operator in Flink Kubernetes operator. This FLIP aims to support this approach officially by extending the application mode with a SQL driver.
FYI, there are three Flink job serializables that could be (potentially) used for job deployment at JobManager:
1. Jars. Used by application mode and web submissions.
2. Jobgraphs. Used by session mode and per-job mode (deprecated).
3. SQLs. Not supported at the moment.
Different from the other two, Jobgraphs are for internal usage, thus not suitable for application mode which requires users to prepare the job serialiables. WRT the leaving two, SQLs require to support SQL submission at JobManager, which is kind of a duplicate effort with the SQL gateway and introduces more complexity at JobManager. Therefore, jars should be the serializable for SQL applications. That's when a SQL driver comes in as the main class entrypoint.
Basic Idea
In general, SQL Driver behaves like a SQL client of non-interactive mode. It mainly executes SQLs at JobManager in the following steps:
1. build a new TableEnvironment with the given configurations and an optional SQL init file
2. execute SQLs in the SQL file by order
3. exit when all SQLs are executed(either sync or async)
The SQL driver could be used not only in application mode but also in session mode and the deprecated per job mode. It could be yet another approach to run Flink SQL without explicitly invoking Flink SQL client.
To sum up, the SQL driver serves mainly two use cases:
- For SQL Client/Gateway users to run Flink SQLs in application mode.
- For non-SQL Client/Gateway users to run Flink SQLs without needing to write a Java program.
Public Interfaces
To support pure SQL jobs, new options would be introduced for Flink configuration.
Specifically, the following configurations:
ConfigOptions.key("sql.file") .stringType() .noDefaultValue() .withDescription( "Specify the path of the local file that contains SQLs to execute."); ConfigOptions.key("sql.init.file") .stringType() .noDefaultValue() .withDescription( "Specify the path of the local file that contains initial SQLs to prepare the environment. For example, create catalogs and functions.");
With these options, a SQL application could be run with the following command:
./bin/flink run-application -t yarn-application \ -Dsql.init.file file:///tmp/init_sql \ -Dsql.file file:///tmp/sql \ -Dpipeline.jars=/opt/flink/usrlib/myudf.jar
WRT SQL Gateway, no public interfaces are changed. If `execution.target` is set to application mode, SQL Gateway would use SQL driver under the hook to build a Flink application. Details are described below.
Proposed Changes
1. Add SQL Driver
Add SQL driver in the SQL client module. It acts as the main entrypoint to initialize the TableEnvironment and then executes the SQLs before exiting.
public final class SqlDriver { public static void main(String[] args) throws Throwable { // initialize the TableEnvironment based on configurations and SQL init files // execute SQLs } }
That enables SQL driver's usage in common CLI, and the rest work is for SQL Gateway's usage.
2. Make SessionState Serializable
Currently, at SQL gateway, the TableEnvironment's states (e.g. functions/resources/configurations) are kept in SessionContext, stored as configurations and SessionState. In order to rebuild the same environment via SQL Driver at JobManager, we have to make SessionState serializable into configurations and pass them to the JobManager.
We may add two serde methods to SessionState as followed:
public class SessionState { //... public void writeToConfiguration(Configuration configuration) { //... } public static SessionState readFromConfiguration(Configuration configuration) { //... } }
3. Support Application Mode in SQL Gateway
The last part is to add code paths to allow SQL Gateway to build a SQL Driver based application if application mode is specified. It's straightforward, thus not explained further.
Compatibility, Deprecation, and Migration Plan
No compatibility issues since it's a new feature.
Test Plan
- Add unit tests to verify new configuration options' parsing.
- Add E2E tests to verify application mode with Flink SQL.
- Add unit tests to SQL Gateway for the application mode with SQLs.
Rejected Alternatives
1. JobGraph-based submission like the deprecated per-job mode
Pros:
- Requires little effort based on per-job mode.
- Avoid rebuilding the same table environment with SQL gateway at JobManager.
- Friendly for the existing per-job mode users.
Cons:
- Per-job mode is deprecated in Flink 1.15.
- Jobgraph is an internal serializable, not suitable for application mode in which users may need to prepare the job serializable (jar/sql/jobgraph etc).
2. Support SQL submission at JobManager
Pros:
- Simple architecture, similar to databases.
Cons:
- Put more responsibilities and load on JobManager.
- There's already an SQL Gateway to interpret SQLs into JobManager-supported serializables.