Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Table/SQL API should provide the same capabilities as its peer DataStream API, thus show queries statement implement should be aligned with flink list in CLI as well. 
  2. Maintaining query status at the client/gateway side requires additional work but brings little extra user value, since the client/gateway doesn’t persist metadata at the moment.

Status

Current state: "Under Discussion"

Discussion thread: https://lists.apache.org/thread/qkvh9p5w9b12s7ykh3l7lv7m9dbgnf1g

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-27344

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

With the efforts in FLIP-24 and FLIP-91, Flink SQL client supports submitting queries but lacks further support for their lifecycles afterward which is crucial for streaming use cases. That means Flink SQL client users have to turn to other clients (e.g. CLI) or APIs (e.g. REST API) to manage the queries, like triggering savepoints or canceling queries, which makes the user experience of SQL client incomplete. 

Therefore, this proposal aims to complete the capability of SQL client by adding query lifecycle statements. With these statements, users could manage queries and savepoints through pure SQL in SQL client.

Public Interfaces

  • New Flink SQL Statements

Proposed Changes

Architecture Overview

The overall architecture of Flink SQL client would be as follow:

Image Removed

Most parts are remained unchanged, only SQL Parser and Planner need to be modified to support new statements, and a new component ClusterClientFactory is introduced in Executor to enable direct access to Flink clusters.

Query Lifecycle Statements

Query lifecycle statements mainly interact with deployments (clusters and jobs) and have few connections with Table/SQL concepts, thus it’d be better to keep them SQL-client-only like jar statements.

SHOW QUERIES

SHOW QUERIES  statements list the queries in the Flink cluster, which is similar to flink list in CLI.

Code Block
languagesql
titleSyntax: SHOW QUERIES
SHOW QUERIES

The result contains three columns: query_id (namely Flink job id), query_name (namely job name), and status.

Code Block
languagesql
titleResult: SHOW QUERIES
+----------------------------------+-------------+----------|
|            query_id              | query_name  |  status  |
+----------------------------------+-------------+----------|
| cca7bc1061d61cf15238e92312c2fc20 |    query1   |  RUNNING |
| 0f6413c33757fbe0277897dd94485f04 |    query2   |  FAILED  |
+----------------------------------+-------------+----------|

STOP QUERY

STOP QUERY statements stop a non-terminated query, which is similar to `flink stop` in CLI. As stop command has a `--drain` option, we should introduce a table config like `sql-client.stop-with-drain` to support the same functionality.

Code Block
languagesql
titleSyntax: STOP QUERY
STOP QUERY <query_id>

The result would the savepoint path.

Code Block
languagesql
titleResult: STOP QUERY
+--------------------------------------------------------|
|            savepoint_path                              |
+--------------------------------------------------------|
| /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab    |
+--------------------------------------------------------|

CANCEL QUERY

This statement cancels a non-terminated query, which is similar to `flink cancel` in CLI. 

Code Block
languagesql
titleSyntax: SHOW QUERIES
CANCEL QUERY <query_id>

Since CANCEL QUERY doesn’t trigger a savepoint, the result would be a simple OK, like the one returned by DDL.

CREATE SAVEPOINT

This statement triggers a savepoint for the specified query, which is similar to `flink savepoint` in CLI.

Code Block
languagesql
titleSyntax: CREATE SAVEPOINT
CREATE SAVEPOINT <query_id>

The result would the savepoint path.

Code Block
languagesql
titleResult: CREATE SAVEPOINT
+------------------------------------------------------|
|            savepoint_path                            |
+------------------------------------------------------|
| /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab  |
+------------------------------------------------------|

DROP SAVEPOINT

This statement deletes the specified savepoint, which is similar to `flink savepoint –dispose` in CLI.

Syntax:

DROP SAVEPOINT <savepoint_path>

Code Block
languagesql
titleSyntax: DROP SAVEPOINT
DROP SAVEPOINT <savepoint_path>

The result would be a simple OK.

SQL Parser & Planner

To support the new statements, we need to introduce new SQL operators for SQL parser and new SQL operations for the planner.

...

SQL operator

...

SQL operation

...

SqlShowQueries

...

ShowQueriesOperation

...

SqlStopQuery

...

StopQueryOperation

...

SqlCancelQuery

...

CancelQueryOperation

...

SqlCreateSavepoint

...

CreateSavepointOperation

...

SqlDropSavepoint

...

DropSavepointOperation

Executor

Executor would need to convert the query lifecycle operations into ClusterClient commands.

...

SQL operation

...

Cluster Client Command

...

ShowQueriesOperation

...

ClusterClient#listJobs

...

StopQueryOperation

...

ClusterClient#stoplWithSavepoint

...

CancelQueryOperation

...

ClusterClient#cancel

...

CreateSavepointOperation

...

ClusterClient#triggerSavepoint

...

DropSavepointOperation

...

ClusterClient#disposeSavepoint

In addition, to interact with the clusters, Executor should be able to create ClusterClient through ClusterClientFactory, thus a  ClusterClientServiceLoader would be added to Executor.

Implementation Plan

The implementation plan would be simple:

  1. Support the new statements and operations in SQL parser and Planner.
  2. Extend Executor to support the new operations.

Compatibility, Deprecation, and Migration Plan

This FLIP introduces new SQL keywords, which may cause troubles for the existing SQLs. Users need to escape the new keywords if they use them as SQL identifiers.

The new keywords are:

    • QUERY (new)
    • QUERIES (new)
    • STOP (new)
    • CANCEL (new)
    • SAVEPOINT (already reserved)

Rejected Alternatives

An alternative approach to query monitoring is that the SQL client or gateway book keeps every query and is responsible for updating the query status through polling or callbacks. In that way, the query status is better maintained, and we wouldn’t lose track of the queries in cases that they’re cleaned up by the cluster or the cluster is unavailable.

However, there’re 2 major concerns:

  1. Table/SQL API should provide the same capabilities as its peer DataStream API, thus show queries statement implement should be aligned with flink list in CLI as well. 
  2. Maintaining query status at the client/gateway side requires additional work but brings little extra user value, since the client/gateway doesn’t persist metadata at the moment.