Status
Current state: "Under Discussion"
...
Page properties | |||
---|---|---|---|
|
...
...
|
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
With the efforts in FLIP-24 and FLIP-91, Flink SQL client supports submitting queries SQL jobs but lacks further support for their lifecycles afterward which is crucial for streaming use cases. That means Flink SQL client users have to turn to other clients (e.g. CLI) or APIs (e.g. REST API) to manage the queriesjobs, like triggering savepoints or canceling queries, which makes the user experience of SQL client incomplete.
Therefore, this proposal aims to complete the capability of SQL client by adding query lifecycle statements. With these statements, users could manage queries SQL jobs and savepoints through pure SQL in SQL client.
Public Interfaces
- New Flink SQL Statements
Proposed Changes
Architecture Overview
The overall architecture of Flink SQL client/gateway would be as follow:
...
Most parts are remained unchanged, only SQL Parser and Planner need to be modified to support new statements, and a new component ClusterClientFactory is introduced in Executor to enable direct access to Flink clusters.
...
SQL Job Lifecycle Statements
Query SQL job lifecycle statements mainly interact with deployments (clusters and jobs) and have few connections with Table/SQL concepts, thus it’d be better to keep them SQL-client-only like jar statements.
Note:
- The keyword for Flink SQL jobs
...
- was `QUERY`, and now is updated as `JOB`.
- All the <job_id> and <savepoint_path> should be string literals (wrapped in single quotes), otherwise it's hard to parse them.
SHOW RUNNING FLINK SQL JOBS
SHOW QUERIES
statements list This statement lists the queries in the Flink cluster, which is similar to flink list in CLI.
Code Block | ||||
---|---|---|---|---|
| ||||
SHOW QUERIESJOBS |
The result contains three four columns: queryjob_id (namely Flink job idID), queryjob_name (namely job name), and status. Alternatively, we could add status, start/end time, duration, and a link to the job's web UI address.
Code Block | ||||
---|---|---|---|---|
| ||||
+----------------------------------+-------------+----------+-----------|-----------+----| | query_id | query_name | status | address | +--------------------+--------------+-------------+----------|--------------| | cca7bc1061d61cf15238e92312c2fc20 |+ | job_id query1 | RUNNING |127.0.0.1:8081| | 0f6413c33757fbe0277897dd94485f04 | query2job_name | FAILED |127.0.0.1:8081status | start_time | end_time | duration | web_url | +----------------------------------+-------------+----------|---------------| |
STOP A RUNNING FLINK SQL JOB
This statement stops a non-terminated SQL, which is similar to `flink stop` in CLI. As stop command has a `--drain` option, we should introduce a table config like `sql-client.stop-with-drain` to support the same functionality.
There are mainly two styles of syntax:
Code Block | ||||
---|---|---|---|---|
| ||||
STOP QUERY <query_id> |
The result would the savepoint path.
Code Block | ||||
---|---|---|---|---|
| ||||
+-------|----------------------|--------------|-----------------------------------| | | cca7bc1061d61cf15238e92312c2fc20 | query1 | RUNNING savepoint_path | | +-----------2022-05-01 10:20:33 | 2022-05-01 20:45:35 | 10h 25m 2s | http://127.0.0.1:8081| | 0f6413c33757fbe0277897dd94485f04 | query2 | FAILED | 2022-05-01 14:04:24 | 2022-05-01 19:09:47 | 5h 5m 23s | http://127.0.0.1:8081| +----------------------------------+-------------+-------| | /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab | +----------------------+----------------------+------------| |
...
--+----------------------+ |
STOP A RUNNING FLINK SQL JOB
This statement cancels statement stops a non-terminated querySQL, which is similar to `flink stop` and `flink cancel` in CLI.
Code Block | ||||
---|---|---|---|---|
| ||||
CANCELSTOP QUERYJOB <query'<job_id> |
Since CANCEL QUERY
doesn’t trigger a savepoint, the result would be a simple OK, like the one returned by DDL.
CREATE A SAVEPOINT
' [WITH SAVEPOINT] [WITH DRAIN] |
The result would the savepoint pathThis statement triggers savepoints for the specified query, which is similar to `flink savepoint` in CLI.
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE SAVEPOINT <query_id> |
The result would the savepoint path.
| ||||
+----------- | ||||
Code Block | ||||
---|---|---|---|---|
| ||||
+------------------------------------------------------| | savepoint_path | +-----------------------------------------------------------------| | hdfs:/tmp/mycluster/flink-savepoints/savepoint-cca7bc-bb1e257f0dab bb1e257f0dab | +------------------------------------------------------| |
DROP SAVEPOINT
-----------| |
There're two related options to control the fine-grained behavior:
1. WITH SAVEPOINT
If specified, the stop statement stops a SQL job with a DROP SAVEPOINT
statements delete the specified savepoint, which is similar to `flink savepoint –dispose` stop` in CLI.
Syntax:
DROP SAVEPOINT <savepoint_path>
Code Block | ||||
---|---|---|---|---|
| ||||
DROP SAVEPOINT <savepoint_path> |
The Otherwise, the stop statement stops a SQL job ungracefully, just like `flink cancel` In CLI. Since an ungrateful drop doesn’t trigger a savepoint, the result would be a simple OK, like the one returned by DDL.
SQL Parser & Planner
To support the new statements, we need to introduce new SQL operators for SQL parser and new SQL operations for the planner.
...
SQL operator
...
SQL operation
...
SqlShowQueries
...
ShowQueriesOperation
...
SqlStopQuery
...
StopQueryOperation
...
SqlCancelQuery
...
CancelQueryOperation
...
SqlCreateSavepoint
...
CreateSavepointOperation
...
SqlDropSavepoint
...
DropSavepointOperation
Executor
Executor would need to convert the query lifecycle operations into ClusterClient commands.
...
SQL operation
...
Cluster Client Command
...
ShowQueriesOperation
...
ClusterClient#listJobs
...
StopQueryOperation
...
ClusterClient#stoplWithSavepoint
...
CancelQueryOperation
...
ClusterClient#cancel
...
CreateSavepointOperation
...
ClusterClient#triggerSavepoint
...
DropSavepointOperation
...
ClusterClient#disposeSavepoint
2. WITH DRAIN
If specified, the stop statement stops a SQL job and increases the watermark to MAX_WATERMARK to trigger all the timers, which is similar to `flink stop .. --drain` in CLI.
CREATE A SAVEPOINT
This statement triggers savepoints for the specified SQL job, which is similar to `flink savepoint` in CLI.
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE SAVEPOINT FOR JOB'<job_id>' |
The result would the savepoint path.
Code Block | ||||
---|---|---|---|---|
| ||||
+------------------------------------------------------------------|
| savepoint_path |
+------------------------------------------------------------------|
| hdfs://mycluster/flink-savepoints/savepoint-cca7bc-bb1e257f0dab |
+------------------------------------------------------------------| |
SHOW SAVEPOINTS
This statement shows all savepoints in a best-effort manner (since the savepoints are managed by users and outlive Flink clusters, the job manager may not know about all savepoints).
Code Block | ||||
---|---|---|---|---|
| ||||
SHOW SAVEPOINTS |
The result would be savepoint paths.
Code Block |
---|
+------------------------------------------------------------------|
| savepoint_path |
+------------------------------------------------------------------|
| hdfs://mycluster/flink-savepoints/savepoint-cca7bc-bb1e257f0dab |
+---------------------- |
In addition, to interact with the clusters, Executor should be able to create ClusterClient through ClusterClientFactory, thus a ClusterClientServiceLoader would be added to Executor.
Implementation Plan
The implementation plan would be simple:
- Support the new statements and operations in SQL parser and Planner.
- Extend Executor to support the new operations.
Compatibility, Deprecation, and Migration Plan
This FLIP introduces new SQL keywords, which may cause troubles for the existing SQLs. Users need to escape the new keywords if they use them as SQL identifiers.
The new keywords are:
- QUERY (new)
- QUERIES (new)
- STOP (new)
- CANCEL (new)
- SAVEPOINT (already reserved)
Rejected Alternatives
An alternative approach to query monitoring is that the SQL client or gateway book keeps every query and is responsible for updating the query status through polling or callbacks. In that way, the query status is better maintained, and we wouldn’t lose track of the queries in cases that they’re cleaned up by the cluster or the cluster is unavailable.
However, there’re 2 major concerns:
- Table/SQL API should provide the same capabilities as its peer DataStream API, thus show queries statement implement should be aligned with flink list in CLI as well.
- Maintaining query status at the client/gateway side requires additional work but brings little extra user value, since the client/gateway doesn’t persist metadata at the moment.
Status
Current state: "Under Discussion"
Discussion thread: https://lists.apache.org/thread/qkvh9p5w9b12s7ykh3l7lv7m9dbgnf1g
...
Jira | ||||||
---|---|---|---|---|---|---|
|
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
With the efforts in FLIP-24 and FLIP-91, Flink SQL client supports submitting queries but lacks further support for their lifecycles afterward which is crucial for streaming use cases. That means Flink SQL client users have to turn to other clients (e.g. CLI) or APIs (e.g. REST API) to manage the queries, like triggering savepoints or canceling queries, which makes the user experience of SQL client incomplete.
Therefore, this proposal aims to complete the capability of SQL client by adding query lifecycle statements. With these statements, users could manage queries and savepoints through pure SQL in SQL client.
Public Interfaces
- New Flink SQL Statements
Proposed Changes
Architecture Overview
The overall architecture of Flink SQL client would be as follow:
Most parts are remained unchanged, only SQL Parser and Planner need to be modified to support new statements, and a new component ClusterClientFactory is introduced in Executor to enable direct access to Flink clusters.
Query Lifecycle Statements
Query lifecycle statements mainly interact with deployments (clusters and jobs) and have few connections with Table/SQL concepts, thus it’d be better to keep them SQL-client-only like jar statements.
SHOW QUERIES
SHOW QUERIES
statements list the queries in the Flink cluster, which is similar to flink list in CLI.
Code Block | ||||
---|---|---|---|---|
| ||||
SHOW QUERIES |
The result contains three columns: query_id (namely Flink job id), query_name (namely job name), and status.
Code Block | ||||
---|---|---|---|---|
| ||||
+----------------------------------+----------| | hdfs://mycluster/flink-savepoints/savepoint-ca62ea-ce73f92adba2 | +---------------| | query_id | query_name | status | +------------------------------------+-------------+----------| | cca7bc1061d61cf15238e92312c2fc20 | query1 | RUNNING | | 0f6413c33757fbe0277897dd94485f04 | query2 | FAILED | +------| |
DROP A SAVEPOINT
This statement deletes the specified savepoint, which is similar to `flink savepoint –dispose` in CLI.
Code Block | ||||
---|---|---|---|---|
| ||||
DROP SAVEPOINT '<savepoint_path>' |
The result would be a simple OK.
COMPLETE USAGE EXAMPLE
Code Block | ||
---|---|---|
| ||
Flink SQL> INSERT INTO tbl_a SELECT * FROM tbl_b; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 6b1af540c0c0bb3fcfcad50ac037c862 Flink SQL> SHOW JOBS; +----------------------------------+--------------------+----------| |
STOP QUERY
STOP QUERY
statements stop a non-terminated query, which is similar to `flink stop` in CLI. As stop command has a `--drain` option, we should introduce a table config like `sql-client.stop-with-drain` to support the same functionality.
Code Block | ||||
---|---|---|---|---|
| ||||
STOP QUERY <query_id> |
The result would the savepoint path.
Code Block | ||||
---|---|---|---|---|
| ||||
++---------------------+---------------------+-------------+------------------------|+ | savepoint job_path id | job_name | + | status | start_time | end_time | duration | web_url | +----------------------------------+--------------------+--| | /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab | +-------|---------------------|---------------------|-------------|----------------------| |
CANCEL QUERY
This statement cancels a non-terminated query, which is similar to `flink cancel` in CLI.
Code Block | ||||
---|---|---|---|---|
| ||||
CANCEL QUERY <query_id> |
Since CANCEL QUERY
doesn’t trigger a savepoint, the result would be a simple OK, like the one returned by DDL.
CREATE SAVEPOINT
This statement triggers a savepoint for the specified query, which is similar to `flink savepoint` in CLI.
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE SAVEPOINT <query_id> |
The result would the savepoint path.
Code Block | ||||
---|---|---|---|---|
| ||||
+| | 6b1af540c0c0bb3fcfcad50ac037c862 | INSERT INTO tbl_a..| RUNNING | 2022-05-01 10:20:33 | 2022-05-01 10:20:53 | 0h 0m 20s | http://127.0.0.1:8081| +----------------------------------+--------------------+---------+---------------------+---------------------+-------------+----------------------+ Flink SQL > CREATE SAVEPOINT FOR JOB '6b1af540c0c0bb3fcfcad50ac037c862'; +------------------------------------------------------------------| | | | savepoint_path | +------------------------------------------------------------------| | hdfs://tmpmycluster/flink-savepoints/savepoint-cca7bc-bb1e257f0dab | +-cca7bc-bb1e257f0dab | +------------------------------------------------------------------| Flink SQL > STOP JOB '6b1af540c0c0bb3fcfcad50ac037c862'; [INFO] The specified job is stopped. Flink SQL > DROP SAVEPOINT 'hdfs://mycluster/flink-savepoints/savepoint--------------| |
DROP SAVEPOINT
This statement deletes the specified savepoint, which is similar to `flink savepoint –dispose` in CLI.
Syntax:
DROP SAVEPOINT <savepoint_path>
Code Block | ||||
---|---|---|---|---|
| ||||
DROP SAVEPOINT <savepoint_path> |
The result would be a simple OK.
SQL Parser & Planner
To support the new statements, we need to introduce new SQL operators for SQL parser and new SQL operations for the planner.
...
SQL operator
...
SQL operation
...
SqlShowQueries
...
ShowQueriesOperation
...
SqlStopQuery
...
StopQueryOperation
...
SqlCancelQuery
...
CancelQueryOperation
...
SqlCreateSavepoint
...
CreateSavepointOperation
...
SqlDropSavepoint
...
DropSavepointOperation
Executor
Executor would need to convert the query lifecycle operations into ClusterClient commands.
...
SQL operation
...
Cluster Client Command
...
ShowQueriesOperation
...
ClusterClient#listJobs
...
StopQueryOperation
...
ClusterClient#stoplWithSavepoint
...
CancelQueryOperation
...
ClusterClient#cancel
...
CreateSavepointOperation
...
ClusterClient#triggerSavepoint
...
DropSavepointOperation
...
ClusterClient#disposeSavepoint
In addition, to interact with the clusters, Executor should be able to create ClusterClient through ClusterClientFactory, thus a ClusterClientServiceLoader would be added to Executor.
Implementation Plan
The implementation plan would be simple:
- Support the new statements and operations in SQL parser and Planner.
- Extend Executor to support the new operations.
Compatibility, Deprecation, and Migration Plan
This FLIP introduces new SQL keywords, which may cause troubles for the existing SQLs. Users need to escape the new keywords if they use them as SQL identifiers.
The new keywords are:
- QUERY (new)
- QUERIES (new)
- STOP (new)
- CANCEL (new)
- SAVEPOINT (already reserved)
Rejected Alternatives
An alternative approach to query monitoring is that the SQL client or gateway book keeps every query and is responsible for updating the query status through polling or callbacks. In that way, the query status is better maintained, and we wouldn’t lose track of the queries in cases that they’re cleaned up by the cluster or the cluster is unavailable.
However, there’re 2 major concerns:
...
cca7bc-bb1e257f0dab';
[INFO] The specified savepoint is dropped. |
SQL Parser & Planner
To support the new statements, we need to introduce new SQL operators for SQL parser and new SQL operations for the planner.
SQL operator | SQL operation |
SqlShowJobs | ShowJobsOperation |
SqlStopQuery | StopJobOperation |
SqlShowSavepoints | ShowSavepointsOperation |
SqlCreateSavepoint | CreateSavepointOperation |
SqlDropSavepoint | DropSavepointOperation |
Executor
Executor would need to convert the query lifecycle operations into ClusterClient commands.
SQL operation | Cluster Client Command |
ShowJobsOperation | ClusterClient#listJobs |
StopJobOperation | ClusterClient#stopWithSavepoint | ClusterClient#cancel |
ShowSavepointOperation | ClusterClient |
CreateSavepointOperation | ClusterClient#triggerSavepoint |
DropSavepointOperation | ClusterClient#disposeSavepoint |
In addition, to interact with the clusters, Executor should be able to create ClusterClient through ClusterClientFactory, thus a ClusterClientServiceLoader would be added to Executor.
Implementation Plan
The implementation plan would be simple:
- Support the new statements and operations in SQL parser and Planner.
- Extend Executor to support the new operations.
Compatibility, Deprecation, and Migration Plan
This FLIP introduces new SQL keywords, which may cause troubles for the existing SQLs. Users need to escape the new keywords if they use them as SQL identifiers.
The new keywords are:
- JOB (new)
- JOBS (new)
- STOP (new)
- DRAIN (new)
- SAVEPOINT (already reserved)
- SAVEPOINTS (already reserved)
Rejected Alternatives
Book Keep Query Status in SQL Gateway
An alternative approach to query monitoring is that the SQL client or gateway book keeps every query and is responsible for updating the query status through polling or callbacks. In that way, the query status is better maintained, and we wouldn’t lose track of the queries in cases that they’re cleaned up by the cluster or the cluster is unavailable.
However, there’re 2 major concerns:
- Table/SQL API should provide the same capabilities as its peer DataStream API, thus show queries statement implement should be aligned with flink list in CLI as well.
- Maintaining query status at the client/gateway side requires additional work but brings little extra user value, since the client/gateway doesn’t persist metadata at the moment.
Savepoint Syntax: SAVEPOINT / RELEASE SAVEPOINT
An alternative syntax of savepoints is like:
Code Block | ||
---|---|---|
| ||
SAVEPOINT '<job_id>'
RELEASE SAVEPOINT '<savepoint_path>' |
But there are mainly two concerns:
- Generally speaking, SAVEPOINT is more appropriate to be followed by a savepoint identifier instead of a job identifier.
- The statements are often used within database transaction blocks, so it would be kind of unnatural to be used alone.