Status
...
Page properties | ||
---|---|---|
|
...
...
...
...
|
...
|
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Table of Contents |
---|
Introduction
The whole conception and architecture of SQL Client are proposed in FLIP-24 which mainly focuses on embedded mode. The goal of this FLIP is to extend FLIP-24 to support gateway mode and expose the Gateway with pluggable endpoints. The reason why we introduce the gateway with pluggable endpoints is that many users has their preferences. For example, the HiveServer2 users prefer to use the gateway with HiveServer2-style API, which has numerous tools. However, some filnk-native users may prefer to use the REST API. Therefore, we hope to learn from the Kyuubi's design that expose multiple endpoints with different API that allow the user to use. Section 2 introduces the main concepts in the Flink SQL Gateway. Section 3 proposes the architecture of the SQL Gateway. Section 4 introduces the component API and the REST endpoint. Section 5 introduces how the Flink SQL Client connects to the Flink
The goal of the FLIP:
- Introduce the Gateway with REST endpints
- Design the pluggable endpoint API.
- Allows the SQL Client to submit the statement to the SQL Gateway.
Core Concepts
Like many big data platforms, Flink SQL Gateway also has the following concepts.
Session
Session
represents the users who visit the Gateway in the peiord. Flink SQL Gateway uses the SessionHandle
as the index to identify the Session
. In addition to uniquely identifying the user being accessed, it also acts as an isolation of resources, including jar resources, configuration information and meta information.
Operation
Every user request is transformed to Operation
.
...
- Get
OperationStatus
that describes whether theOperation
is running or meets error. - Cancel the running Operation:
- Close the finished Operation;
SessionManager
SessionManager
is responsible to manage the Session
, including:
- Every user should register a
Session
before sending its request to the Gateway; - Every user should close its corresponding
Session
when it exists. If aSession
is inactive for a period of time, theSessionManager
will clean up theSession
automatically; SessionManager
should limit the number of the activeSession
to avoid OOM;
OperationManager
OperationManager
is responsible to manage the Operation
. The Operation
's initialization, execution and clean up should be controlled by the OperationManager
. When the Session
closes, the OperationManager
also needs to clean up all alive Operation
.
...
We organize all the concepts in the following graph.
Architecture
The architecture of the Gateway is in the following graph.
...
Flink SQL Gateway is composed of the Endpoints
, GatewayService
and MetricSystem
.
...
- GatewayService exposes API about the management of the
Session
andSession,
Operation
and utils. The Endpoints can combine these methods to satisfy its requirements; - GatewayService is responsible to expose the registry of the Session;
- .
- GatewayService GatewayService executes the Operation async: when submit the operation to the OperationManager, return the OperationHandle. It doesn't wait the Operation execution finishes ;
- GatewayService is shared between all the loaded endpoints.
- Endpoint: It's the main entry point for the users to visit.
...
- Considering all Operation has its status, we propose the state machine like HiveServer2:
- INITIALIZED: Operation is created ;
- PENDING: Status during the status switch;
- RUNNING: Operation starts running;
- FINISHED: Operation finishes;
- CANCELED: User cancels the Operation;
- CLOSED: User closes the Operation;
- ERROR: Operation execution meet errors;
- TIMEDOUT:Execution timeout
...
- For DQL/DML,we need to convert the
JobStatus
to the correspondingOperationStatus
in the synchronous mode(In asynchronous mode, the operation finishes when submit the job). We have the following mapping with Flink's state machine.
- CREATED -> INITIALIZED
- FAILED -> ERROR
- CANCElED -> CANCELED
- FINISHED -> FINISHED
- RUNNING -> RUNNING
- INITIALIZING/FAILING/CANCELLING/RESTARTING/SUSPENDED/RECONCILING -> PENDING
- The state machine as follows
- MetricSystem: it's responsible to report the metric to the specified destination.
- Gateway is the main entry for the user to submit SQL jobs. In many cases, some metrics are needed to measure the state of the entire system, so as to locate some problems. Or use indicators to facilitate the management of the gateway by the peripheral system, e.g. load balancing.
Component API
Handle
Code Block | ||
---|---|---|
| ||
public class HandleIdentifier {
UUID publicId;
UUID secretId;
}
public class SessionHandle {
HandleIdentifier identifier;
}
/**
* Every Endpoint should expose its version and extend the interface.
*/
interface ProtocolVersion {}
enum RestEndpointVersion implements ProtocolVersion {
V1;
}
/**
* It's equal to the HiveServer2 TProtocolVersion. It should belong to the
* hive module.
*/
enum HiveServer2ProtocolVersion implements ProtocolVersion {
HIVE_CLI_SERVICE_PROTOCOL_V1,
// V2 adds support for asynchronous execution
HIVE_CLI_SERVICE_PROTOCOL_V2
// V3 add varchar type, primitive type qualifiers
HIVE_CLI_SERVICE_PROTOCOL_V3
// V4 add decimal precision/scale, char type
HIVE_CLI_SERVICE_PROTOCOL_V4
// V5 adds error details when GetOperationStatus returns in error state
HIVE_CLI_SERVICE_PROTOCOL_V5
// V6 uses binary type for binary payload (was string) and uses columnar result set
HIVE_CLI_SERVICE_PROTOCOL_V6
// V7 adds support for delegation token based connection
HIVE_CLI_SERVICE_PROTOCOL_V7
// V8 adds support for interval types
HIVE_CLI_SERVICE_PROTOCOL_V8
// V9 adds support for serializing ResultSets in SerDe
HIVE_CLI_SERVICE_PROTOCOL_V9
// V10 adds support for in place updates via GetOperationStatus
HIVE_CLI_SERVICE_PROTOCOL_V10
// V11 adds timestamp with local time zone type
HIVE_CLI_SERVICE_PROTOCOL_V11
}
enum OperationType {
EXECUTE_QUERY_STATEMENT,
EXECUTE_NON_QUERY_STATEMENT
GET_INFO,
UNKNOWN;
}
public class OperationHandle {
HandleIdentifier identifier;
} |
GatewayService
API
Code Block | ||
---|---|---|
| ||
interface SQLGatewayService {
// -------------------------------------------------------------------------------------------
// Session Management
// -------------------------------------------------------------------------------------------
SessionHandle openSession(SessionEnvironment environment) throws SQLGatewayException;
void closeSession(SessionHandle sessionHandle);
Map<String, String> getSessionConfig(SessionHandle sessionHandle);
// -------------------------------------------------------------------------------------------
// Operation Management
// -------------------------------------------------------------------------------------------
/**
* Get operation info to describe the Operation.
*/
OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle);
/** Get the result schema for the specified Operation. */
ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle oprationHandle);
void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle);
void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle);
// -------------------------------------------------------------------------------------------
// Statements
// -------------------------------------------------------------------------------------------
/**
* Using the statement to initialize the Session. It's only allowed to
* execute SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR/REMOVE JAR.
*/
void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs);
/** Execute the statement with the specified Session. It allows to execute with Operation-level configuration.*/
OperationHandle executeStatement(
SessionHandle sessionHandle,
String statement,
long executionTimeoutMs,
Configuration executionConfig);
ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, int token, int maxRows);
/**
* For the same functionality, every endpoint has its result schema. Therefore,
* the endpoint submit the callable executor to the OperationManager that manages
* lifecycle of the Operaiton. The callable executor organizes the results
* as the Endpoint requires.
*/
OperationHandle submitOperation(OperationType type, Callable<ResultSet> executor, ResolvedSchema resultSchema);
// -------------------------------------------------------------------------------------------
// Utils
// -------------------------------------------------------------------------------------------
/**
* Describe the cluster info.
*/
Map<String, String> getGatewayInfo();
void heartbeat(SessionHandle sessionHandle);
/**
* Endpoint is status-less. All the session configs are memorized in the GatewayService side.
*/
EndpointVersion getSessionProtocolVersion(SessionHandle sessionHandle);
/** Returns a list of completion hints for the given statement at the given position. */
List<String> completeStatement(String sessionId, String statement, int position);
// -------------------------------------------------------------------------------------------
// Catalog API
// -------------------------------------------------------------------------------------------
String getCurrentCatalog(SessionHandle);
String getCurrentDatabase(SessionHandle);
List<String> listCatalogs();
List<String> listDatabases(String catalogName);
List<TableDescriptor> listTables(String catalogName, String databaseName, TableKind tableKind);
List<FunctionDescriptor> listUserDefinedFunctions(String catalogName, String databaseName);
ContextResolvedTable getTable(ObjectIdentifier tableIdentifier);
ContextResolvedTable getFunction(ObjectIdentifier functionIdentifier);
}
class TableDescriptor {
boolean isTemporary;
ObjectIdentifier identifier;
TableKind tableKind;
}
class UserDefinedFunctionDescriptor {
boolean isTemporary;
ObjectIdentifier identifier;
FunctionKind kind;
String className;
String description;
}
enum TableKind {
ALL,
VIEW,
TABLE
}
class SessionEnvironment {
private String sessionName;
private ProtocolVersion version;
private List<URL> libs;
private List<URL> jars;
private Map<String, String> sessionConfig
}
public class OperationInfo {
OperationStatus status;
OperationType type;
boolean hasResult;
}
public class ResultSet {
int nextToken;
ResultType resultType;
ResolvedSchema resultSchema;
List<RowData> results;
Exception exception;
}
public enum ResultType {
PAYLOAD,
EMPTY,
EOS,
ERROR
} |
Option
...
Option
...
Required
...
Default value
...
Description
sql-gateway.session.idle.timeout
...
No
...
5 min
Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero or negative value.
sql-gateway.session.check.interval
...
No
...
1 min
The check interval for session timeout, which can be disabled by setting to zero or negative value.
sql-gateway.session.max-num
...
No
1000000
...
The number of the active sessions.
sql-gateway.worker.threads.max
...
No
...
500
Maximum number of worker threads for the gateway workers.
sql-gateway.worker.threads.min
...
No
...
5
...
Minimum number of worker threads for the gateway workers.
sql-gateway.worker.keepalive.time
...
No
...
5 min
Keepalive time for an idle worker thread. When the number of workers exceeds the min workers, excessive threads are killed after this time interval.
Endpoint
We use the SPI mechanism to discover the Endpoint to discover and build Endpoint.
Code Block | ||
---|---|---|
| ||
/** Interface for Endpoint. */
public interface SQLGatewayEndpoint {
void start() throws Exception;
void stop() throws Exception;
}
/** Factory to create SQLGatewayEndpointService. */
public interface SQLGatewayEndpointFactory extends Factory {
SQLGatewayEndpoint createSQLGatewayEndpoint(Context context);
interface Context {
SQLGatewayService getSQLGatewayService();
MetricGroup getMetricGroup();
/** Gives read-only access to the configuration of the Endpoint. */
ReadableConfig getConfiguration();
}
} |
We also expose the option endpoint.protocol
to allow user to specify the endpoints. Considering that the different endpoints may have the same settings, e.g. port, users should add the endpoint identifier as the prefix to specify the option, e.g.rest.port
. For simplicity, we don't plan to introduce another yaml for SQL Gateway and users can specify the gateway options in the flink-conf.yaml. For example, users can add the following options in the flink-conf.yaml.
Code Block | ||
---|---|---|
| ||
endpoint.protocol: rest, hiveserver2
endpoint.rest.address: localhost
endpoint.rest.port: 9001
endpoint.hiveserver2.address: localhost
endpoint.hiveserver2.port: 9002 |
REST Endpoint
In this section, we propose to introduce the REST Endpoint, which is different from the REST endpoint we proposed before.
Session-related
OpenSession
...
/v1/sessions
...
Verb: POST
...
Response code: 200 OK
...
Create a new session with the specific configuraion.
...
Request body
...
{
"session_name": "", # optional
"libs": [], # optional used by sql client
"jars": [], # optional used by sql client
"properties": { # optional, properties for current session
"key": "value"
}
}
...
Response body
...
{
"session_handle": "", # if session is created successfully
}
The trigger events for the state machine.
Statement Type | INITIALIZED -> PENDING | PENDING -> RUNNING | RUNNING->FINISHED | INITIALIZED/PENDING/RUNNING → ERROR | INITIALIZED/PENDING/RUNNING → CANCELED/CLOSED | INITIALIZED/PENDING/RUNNING → TIMEOUT |
---|---|---|---|---|---|---|
DDL | Start to prepare the Operation resourc, e.g. LOG. | Get the resources to execution(The worker thread starts running) | TableEnvironment#executeSql finishes | Get any exception. | User requests to cancel/close the operation. | The operation's execution timeout. |
DML | ||||||
Utils statement including
| ||||||
QUERY | Can not fetch any results from the CollectSink. | |||||
SET/RESET | set/reset session config | Setting/resetting session config finsihes |
We can summarize the state machine as follows
- MetricSystem: it's responsible to report the metric to the specified destination.
- Gateway is the main entry for the user to submit SQL jobs. In many cases, some metrics are needed to measure the state of the entire system, so as to locate some problems. Or use indicators to facilitate the management of the gateway by the peripheral system, e.g. load balancing.
Public Interfaces
REST Endpoint
In this section, we propose to introduce the REST Endpoint, which is different from the REST endpoint we proposed before.
Session-related API
Using the API in the section, users can register a Session
in the Gateway, which maintains the user-level configuration and resources.
OpenSession
/v1/sessions | |
Verb: POST | Response code: 200 OK |
Create a new session with the specific configuraion. In the release-1.16, we only supports to load the jar in the local file system. But we can extend this feature to load the jar in the remote filesystem, e.g. HDFS. | |
Request body | { "session_name": "", # optional "libs": [], # optional. "jars": [], # optional "properties": { # optional, properties for current session "key": "value" } } |
Response body | { "session_handle": "", # if session is created successfully } |
CloseSession
/v1/sessions/:session_handle | |
Verb: DELETE | Response code: 200 OK |
Close a session, release related resources including operations and properties | |
Request body | {} |
Response body | { "status": "CLOSED" # if cancel successfully } |
Get Session Config
/v1/sessions/:session_handle | |
Verb: GET | Response code: 200 OK |
Get the session config with the specified session handle. | |
Request body | {} |
Response body | { "properties": { "key": "value" } } |
Configure Session
/v1/sessions/:session_handle/configure_session | |
Verb: POST | Response code: 200 OK |
Configures the session with the statement which could be SET/RESET/CREATE/ DROP/LOAD/UNLOAD/USE/ALTER/ADD JAR/REMOVE JAR. It can be used to initialize the Session:
Note: The statement must be a single command, otherwise the server will throw an exception. | |
Request body | { "statement": "", # required "execution_timeout": "" # execution time limit in milliseconds, optional } |
Response body | {} |
Trigger Session Heartbeat
/v1/sessions/:session_handle/heartbeat | |
Verb: POST | Response code: 200 OK |
Trigger heartbeat to tell the server that the client is active, and to keep the session alive as long as configured timeout value. If a session does not receive a heartbeat or any other operations, the session will be destroyed when the timeout is reached. | |
Request body | {} |
Response body | {} |
Operation-related
Because the Gateway allows async execution mode, the API in the section can manipulate the runnning jobs.
Get Operation Status
/v1/sessions/:session_handle/operations/:operation_handle/status | |
Verb: GET | Response code: 200 OK |
Get the status of a running job. If the session is expired, the server will throw "session not found" exception. If the job is finished, the server will throw "job not found" exception. | |
Request body | {} |
Response body | { "status": "" # refer to OperationStatus } |
Cancel Operation
/v1/sessions/:session_handle/operations/:operation_handle/cancel | |
Verb: PUT | Response code: 200 OK |
Cancel the running operation and update the opeartion status. | |
Request body | {} |
Response body | { "status": "CANCELED" # if cancel successfully } |
Close Operation
/v1/sessions/:session_handle/operations/:operation_handle | |
Verb: DELETE | Response code: 200 OK |
Remove the specified If the user invokes closeOperation twice, the later invocation will get exception. | |
Request body | {} |
Response body | { "status": "CLOSED" # if close successfully } |
Statement-related
The API in the section is used to submit the statement to the Gateway and get the results.
Execute a statement
/v1/sessions/:session_handle/statements | |
Verb: POST | Response code: 200 OK |
Execute a statement which could be all Flink supports SQL statement. The SET xx=yy statement will override/update the TableConfig held by current session, and the RESET statement will reset all properties set by SET xx=yy statement. The USE MODULE/CATALOG/DATABASE xx statement will update the default module/catalog/database in TableEnvironment held by current session. The statement must be a single command, otherwise the server will throw an exception. For For | |
Request body | { "statement": "", # required "execution_timeout": "" # execution time limit in milliseconds, optional } |
Response body | { "operation_handle": "", "operation_type": "EXECUTE_STATEMNT", "has_result": true/false # determine whether needs to fetch results later } |
Fetch results
/v1/sessions/:session_handle/operations/:operation_handle/result/:token | |
Verb: GET | Response code: 200 OK |
Fetch a part of result for a flink job execution. If the result data is too large or the result is streaming, we can use this API to get a part of the result at a time. The initialized value of token is 0. The token in the next request must be the same as the token in the current request or must be equal to token (in the current request) + 1, otherwise the client will get an exception from the server. If multiple requests are executed with the same token, the result is the same. This design makes sure the client could get the result even if some errors occur in client. The client can get the next part of result using /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} (which is the value of next_result_uri in the response data). If next_result_uri is empty, no more data is remaining in the server. The server could drop the old data before current token. (The client successfully obtains those data) We will introduce fetch_size or max_wait_time (to reach the fetch_size) for optimization in future. The returned result has the same schema as the TableResult#getResolvedSchema. Please refer to the Appendix about the transormation between the ResultSet and JSON. | |
Request body | {} |
Response body | { "result_type": "PAYLOAD", "results": [ # currently, there is only one result now. If multiple queries is executed in a single job, there are many results. { "columns": [ # if the execution is successful { "name": "", "type": {"type": "BOOLEAN", "nullable": true}# string value of LogicalType }, ], "data": [ ["value", ], # a row data ] }, ], "next_result_uri": /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} # if not empty, uses this uri to fetch next part of result, else there is no more result. "exception": { "root_cause": "....", "exception_stack": "..." } } |
Statement Completement
/v1/sessions/:session_handle/complete_statement | |
Verb: GET | Response code: 200 OK |
Complete the statements. For example, users input SELE in the terminal and press the tab, the terminal will use the API to complete the statement and return the SELECT. | |
Request body | { "statement": "", "postion": } |
Response body | { "candidates": [] } |
Util
Get Info
/v1/info | |
Verb: GET | Response code: 200 OK |
Get meta data for this cluster | |
Request body | {} |
Response body | { "product_name": "Apache Flink", "version": "1.16" # Flink version } |
Get Version
/api_versions | |
Verb: GET | Response code: 200 OK |
Get the current avaliable versions for the Rest Endpoint. The client can choose one of the return version as the protocol for later communicate. | |
Request body | {} |
Response body | { "versions": ["v1", "v2"] # The rest endpoint support version. } |
Options
Please using the following options to configure the REST endpint.
Option name | Default Value(Required) | Description |
sql-gateway.endpoint.type | rest (Yes) | REST endpoint should use 'rest'. |
sql-gateway.endpoint.rest.port | 8083(No) | REST endpoint port. |
sql-gateway.endpoint.rest.address | 127.0.0.1 (No) | The address that the SqlServer binds itself. |
Pluggable Endpoint Discovery
The pluggable endpoint discovery will load the endpoint dynamically. It enables the users to load the endpoint they want to use. Here We use the SPI mechanism to discover the Endpoint.
Code Block | ||
---|---|---|
| ||
/** Interface for Endpoint. */
public interface SqlGatewayEndpoint {
void start() throws Exception;
void stop() throws Exception;
}
/**
* A factory for creating Endpoint from Configuration. This
* factory is used with Java's Service Provider Interfaces (SPI) for discovery.
*/
public interface SqlGatewayEndpointFactory extends Factory {
SqlGatewayEndpoint createSqlGatewayEndpoint(Context context);
interface Context {
SqlGatewayService getSqlGatewayService();
MetricGroup getMetricGroup();
/** Gives read-only access to the configuration of the Endpoint. */
ReadableConfig getConfiguration();
}
} |
We also expose the option sql-gateway.
endpoint.type
to allow user to specify the endpoints. Considering that the different endpoints may have the same settings, e.g. port, users should add the endpoint identifier as the prefix to specify the option, e.g.rest.port
. For simplicity, we don't plan to introduce another yaml for SQL Gateway and users can specify the gateway options in the flink-conf.yaml.
For example, users can add the following options in the flink-conf.yaml.
Code Block | ||
---|---|---|
| ||
sql-gateway.endpoint.type: rest, hiveserver2
sql-gateway.endpoint.rest.address: localhost
sql-gateway.endpoint.rest.port: 9001
sql-gateway.endpoint.hiveserver2.address: localhost
sql-gateway.endpoint.hiveserver2.port: 9002 |
GatewayService API
Object API
The API is used to describe the Session and Operation.
Code Block | ||
---|---|---|
| ||
public class HandleIdentifier {
UUID publicId;
UUID secretId;
}
public class SessionHandle {
HandleIdentifier identifier;
}
/**
* Every Endpoint should expose its version and extend the interface.
*/
interface EndpointVersion {}
enum RestEndpointVersion implements EndpointVersion {
V1;
}
/**
* It's equal to the HiveServer2 TProtocolVersion. It should belong to the
* hive module.
*/
enum HiveServer2EndpointVersion implements EndpointVersion {
HIVE_CLI_SERVICE_PROTOCOL_V1,
// V2 adds support for asynchronous execution
HIVE_CLI_SERVICE_PROTOCOL_V2
// V3 add varchar type, primitive type qualifiers
HIVE_CLI_SERVICE_PROTOCOL_V3
// V4 add decimal precision/scale, char type
HIVE_CLI_SERVICE_PROTOCOL_V4
// V5 adds error details when GetOperationStatus returns in error state
HIVE_CLI_SERVICE_PROTOCOL_V5
// V6 uses binary type for binary payload (was string) and uses columnar result set
HIVE_CLI_SERVICE_PROTOCOL_V6
// V7 adds support for delegation token based connection
HIVE_CLI_SERVICE_PROTOCOL_V7
// V8 adds support for interval types
HIVE_CLI_SERVICE_PROTOCOL_V8
// V9 adds support for serializing ResultSets in SerDe
HIVE_CLI_SERVICE_PROTOCOL_V9
// V10 adds support for in place updates via GetOperationStatus
HIVE_CLI_SERVICE_PROTOCOL_V10
// V11 adds timestamp with local time zone type
HIVE_CLI_SERVICE_PROTOCOL_V11
}
enum OperationType {
EXECUTE_STATEMENT,
GET_INFO,
UNKNOWN;
}
public class OperationHandle {
HandleIdentifier identifier;
} |
SqlGatewayService API
Code Block | ||
---|---|---|
| ||
interface SqlGatewayService {
// -------------------------------------------------------------------------------------------
// Session Management
// -------------------------------------------------------------------------------------------
SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException;
// -------------------------------------------------------------------------------------------
// Operation Management
// -------------------------------------------------------------------------------------------
/**
* Get operation info to describe the Operation.
*/
OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
/** Get the result schema for the specified Operation. */
ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle opreationHandle) throws SqlGatewayException;
void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
// -------------------------------------------------------------------------------------------
// Statements
// -------------------------------------------------------------------------------------------
/**
* Using the statement to initialize the Session. It's only allowed to
* execute SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR/REMOVE JAR.
*/
void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException;
/** Execute the statement with the specified Session. It allows to execute with Operation-level configuration.*/
OperationHandle executeStatement(
SessionHandle sessionHandle,
String statement,
long executionTimeoutMs,
Configuration executionConfig) throws SqlGatewayException;
/**
* Fetch the results with token id.
*/
ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, int token, int maxRows) throws SqlGatewayException;
/**
* Fetch the Operation-level log from the GatewayService. For some endpoint, it allows to fetch the log at the operation level.
*/
ResultSet fetchLog(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException;
/**
* Only supports to fetch results in FORWARD/BACKWARD orientation.
* - Users can only BACKWARD from the current offset once.
* - The Gateway doesn't materialize the changelog.
*/
ResultSet fetchResult(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException;
/**
* For the same functionality, every endpoint has its result schema. Therefore,
* the endpoint submit the callable executor to the OperationManager that manages
* lifecycle of the Operaiton. The callable executor organizes the results
* as the Endpoint requires.
*/
OperationHandle submitOperation(OperationType type, Callable<ResultSet> executor, ResolvedSchema resultSchema) throws SqlGatewayException;
// -------------------------------------------------------------------------------------------
// Utils
// -------------------------------------------------------------------------------------------
/**
* Describe the cluster info.
*/
Map<String, String> getGatewayInfo();
void heartbeat(SessionHandle sessionHandle) throws SqlGatewayException;
/**
* Endpoint is status-less. All the session configs are memorized in the GatewayService side.
*/
EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) throws SqlGatewayException;
/** Returns a list of completion hints for the given statement at the given position. */
List<String> completeStatement(String sessionId, String statement, int position) throws SqlGatewayException;
// -------------------------------------------------------------------------------------------
// Catalog API
// -------------------------------------------------------------------------------------------
String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException;
String getCurrentDatabase(SessionHandle sessionHandle) throws SqlGatewayException;
List<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException;
List<String> listDatabases(SessionHandle sessionHandle, String catalogName) throws SqlGatewayException;
List<TableInfo> listTables(SessionHandle sessionHandle, String catalogName, String databaseName, TableKind tableKind) throws SqlGatewayException;
List<FunctionInfo> listFunctions(SessionHandle sessionHandle, String catalogName, String databaseName, FunctionScope scope) throws SqlGatewayException;
ContextResolvedTable getTable(SessionHandle sessionHandle, ObjectIdentifier tableIdentifier) throws SqlGatewayException;
ContextResolvedFunction getFunction(SessionHandle sessionHandle, ObjectIdentifier functionIdentifier) throws SqlGatewayException;
}
class TableInfo {
boolean isTemporary;
ObjectIdentifier identifier;
TableKind tableKind;
}
class FunctionInfo {
boolean isTemporary;
ObjectIdentifier identifier;
FunctionKind kind;
}
class SessionEnvironment {
private String sessionName;
private EndpointVersion sessionEndpointVersion;
private List<URL> libs;
private List<URL> jars;
private Map<String, String> sessionConfig
}
public class OperationInfo {
OperationStatus status;
OperationType type;
boolean hasResult;
}
public class ResultSet {
int nextToken;
ResultType resultType;
ResolvedSchema resultSchema;
List<RowData> results;
Exception exception;
}
public enum ResultType {
PAYLOAD,
EMPTY,
EOS,
ERROR
} |
Options
Please use the following API to configure the gateway.
Option | Required | Default value | Description |
---|---|---|---|
sql-gateway.session.idle-timeout | No | 5 min | Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero or negative value. |
sql-gateway.session.check-interval | No | 1 min | The check interval for session timeout, which can be disabled by setting to zero or negative value. |
sql-gateway.session.max-num | No | 1000000 | The number of the active sessions. |
sql-gateway.worker.threads.max | No | 500 | Maximum number of worker threads for the gateway workers. |
sql-gateway.worker.threads.min | No | 5 | Minimum number of worker threads for the gateway workers. |
sql-gateway.worker.keepalive-time | No | 5 min | Keepalive time for an idle worker thread. When the number of workers exceeds the min workers, excessive threads are killed after this time interval. |
SQL Gateway Script
Considering many users prefer to use the SqlGateway only, we propose to add a script named the `sql-server.sh` in the bin directory. Users can use the command
Code Block | ||
---|---|---|
| ||
./sql-gateway.sh (start|stop|stop-all) [args] |
to manipulate the sql gateway.
Command | Parmeter | Description |
---|---|---|
start | -Dkey=value | Start the gateway and write the pid of the startted sql gateway into the pid file. Users can specify the -Dkey=value to specify the parameters. For example, users can specify `-Dsql-gateway.endpoint.rest.address=127.0.0.1` |
stop | (none) | Stop the last in the pid file. |
stop-all | (none) | Stop all the server in the running pid file. |
Then users can start the sql client to communicate with the SQL Gateway in the local or remote environment.
SQL Client API
When lanuch the SQL Client in the gateway mode, user should specify the address and port of the Gateway they want to commuincate. Therefore, we should expose the following startup options.
Parameter | Requried | Description |
-e, --endpoint | Yes | The gateway address:port |
User can start up the SQL Client with the command in the bash.
Code Block | ||
---|---|---|
| ||
./sql-client.sh -e 127.0.0.1:9092 |
Usage
With the SQL Gateway, users can just use the SQL to do everything.
Configure parameters
Users can use the SET
statement to configure the parameters, including
- execution parameters, streaming or batch
- optimization configuration
- job parameters, job name
- …
If users want to reset the configuration, users can use the RESET
statement to rollback all the settings.
Gateway also allows users to add the jar dynamically. Users can just use the ADD JAR
statement to specify the jar path in the sql.
Manage the metadata
Users can use the DDL to register the required catalog in the Gateway. With the catalog, users can CREATE/DROP/ALTER all tables in the catalog.
Manage Jobs
Users can submit the DML in the Gateway. In the future we may also support to use sql to manage the submitted jobs.
Example
Here is an example to using the SQL client to submit the statement to the SQL Gateway.
Code Block | ||
---|---|---|
| ||
Flink SQL> CREATE CATALOG hive WITH(
> 'type' = 'hive',
> 'hive-conf-dir' = '/opt/hive-conf'
>);
[INFO] Execute statement succeed.
Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE pageviews (
> user_id BIGINT,
> page_id BIGINT,
> viewtime TIMESTAMP,
> proctime AS PROCTIME()
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'pageviews',
> 'properties.bootstrap.servers' = '...',
> 'format' = 'avro'
> );
[INFO] Execute statement succeed.
Flink SQL> INSERT INTO hive.default_db.kafka_table SELECT * FROM pageviews;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 6b1af540c0c0bb3fcfcad50ac037c862
|
Implementation
SQL Client Overview
SQL Client has different modes. The architecture of the SQL Client in the embedded mode as follows.
In the Gateway mode, the SQL Client uses the Rest Client to communicate with the GatewayService.
Actually the architecture in the different mode are almost the same. The only difference is how to communicate with the GatewayService. Therefore, we focus on the Gateway mode in this section. The process logic in the Gateway mode should also works for the embeded mode.
The SQL Client is composed of the CliClient
and Executor
.
- The
CliClient
is responsible to receives the statement from the terminal and print the results; - The
Exectuor
is responsible to execute the statement from theCliClient
and return the results.
- It has a client-level parser, which determines the statement whether is the client-level command, e.g. HELP, QUIT.
- It can submit the statement to the GatewayService with the REST Client.
Gateway Implementation Details
- Introduce the package flink-table/flink-sql-gateway-common, which includes the gateway API. When users wants to implement its own endpoint, they only needs to rely on this.
- Introduce the package flink-table/flink-sql-gateway. It includes rest endpoint and SQL GatewayService.
- flink-table/flink-sql-client should relies on the the package flink-table/flink-sql-gateway.
Compatibility, Deprecation, and Migration Plan
Because we introduce the Gateway modules there are no compatibility problems. For the SQL Client, we use the Gateway to submit jobs. We will keep all the functionality but change the presentation. For example, when executing explain, the SQL Client will print the results in the now but currently we takes the results from the table now.
Future work
The Gateway itself has many functionlities. We only list part of the work here:
- Metric System
- Authentication module
- Multiple Endpoint, e.g. HiveServer2 endpoint
- Persistent Gateway
- ...
We will collect more feedbacks to determine which features is more important to users.
Rejected Alternatives
TableInfo and FunctionInfo VS CatalogTable and CatalogFunction
The CatalogTable and CatalogFunction are much heavier than the TableInfo and FunctionInfo. The CatalogManager requires reading from the Catalog to get the schema. But in the listTables only care about the table name, which is much lighter. Therefore, we propose to use the TableInfo with required fields.
Support the multi-version Flink in the Gateway VS Support the multi-version in the external Service
Currently many big data tools, e.g. Zeppelin, Livy[1] support the multiple version engine. They both have the similar architecture. The Flink on Zeppelin is like[2]:
The Interpreter is a process, which means that every Flink is in the separate JVM[3].
The current Gateway is responsible for compiling the user SQL and submitting the job to the cluster, which is almost the same as the Interpreter in the graph.
If we support the multi-version in the Gateway(GatewayService also has a version), it means all the Flink are in the same JVM. It requires the Gateway to solve the shim, classloader for different flink versions, deployments. It will mess up all the codes with refelection. It's better we can follow the design as other tools.
[1] https://issues.cloudera.org/browse/LIVY-106
[2] https://zeppelin.apache.org/docs/latest/interpreter/flink.html#flink-on-zeppelin-architecture
[3] https://zeppelin.apache.org/docs/0.6.2/development/writingzeppelininterpreter.html
Merge Gateway into the Flink code base VS Support Gateway in the another repo
The main reason to move the Gateway is to support the multiple Flink versions in the Gateway. However, in the discussion above we think the Gateway is bound to the specific Flink and uses the external service to manage the Gateway instances. Considering the Gateway itself is bound to the Flink, it's better we can merge the Gateway into the Flink repo.
It also brings the following benefits:
- Reduce the cost, e.g. CI test, releases, maintain cost;
- SQL Client has the ability to submit the SQL to the SQLGateway and we can reuse most of the codes;'
- Gateway inside the Flink repo can ensure the highest degree of version compatibility
- Gateway is indispensable for a SQL engine (think of Trino/Presto, Spark, Hive). Otherwise, Flink will always be a processing system. With Gateway inside the Flink repo, Flink can provide an out-of-box experience as a SQL query engine. Users can try out the gateway for the latest version when a new version is released.
Therefore, we prefer to merge the Gateway into the Flink repo.
Result retrieval for interactive sessions VS result retrieval for fault tolerant exactly-once results
Because the current Gateway doesn't materialize anything to the stroage, we can't promise the exactly-once semantic. It means users can't retrieval the results if the Gateway has been notified the results is been taken away.
OperationStatus VS JobStaus
The main reason we don't use the JobStatus in the state in the machine:
- Operation includes DDL, DML and so on, which is much larger than the Job. We can't use a small concept to replace large concept.
- Some status in the JobStats is meaningless in the Operation. For example, DDL Operation don't need RESTARTING/SUSPENDED/RECONCILING.
the Gateway allows to submit job(DML) in sync/async mode. The running status in the Operation Status in the different mode has different meaning:
- In the async mode, when the gateway submits the job, the state comes to the FINISHED state
- In the sync mode, the running status in the Operation status includes submitting the job, running job. Even if a failover occurs, we still think that this Operation is in the RUNNING state. Unless the job is unrecoverable, we change the Operation status to ERROR.
Therefore, we propose a new state machine in the Gateway side.
Appendix
Serialize and deserialize the ResultSet
The serialization of the ResultSet mainly takes into 3 parts:
- Serialize the LogicalType
- Seralize the RowData
- Serialize the Exception
Serialize the LogicalType
Considering that not all LogicalType are serializable, our plan is much like how the LogicalTypeJsonSerializer
does. Currently the LogicalType has 3 kinds:
- Basic Type
Type Name | JSON |
---|---|
CHAR | {"type": "CHAR", "nullable": true/false, "length": <LENGTH>} |
VARCHAR | {"type": "VARCHAR", "nullable": true/false, "length": <LENGTH>} |
STRING | {"type": "VARCHAR", "nullable": true/false, "length": <LENGTH>} |
BOOLEAN | {"type": "BOOLEAN", "nullable": true/false} |
BINARY | {"type": "BINARY", "nullable": true/false, "length": <LENGTH>} |
VARBINARY | {"type": "VARBINARY", "nullable": true/false, "length": <LENGTH>} |
BYTES | {"type": "VARBINARY", "nullable": true/false, "length": <LENGTH>} |
DECIMAL | {"type": "DECIMAL", "nullable": true/false, "precision": <LENGTH>, "scale": <SCALE>} |
TINYINT | {"type": "TINYINT", "nullable": true/false} |
SMALLINT | {"type": "SMALLINT", "nullable": true/false} |
INTEGER | {"type": "INTEGER", "nullable": true/false} |
BIGINT | {"type": "BIGINT", "nullable": true/false} |
FLOAT | {"type": "FLOAT", "nullable": true/false} |
DOUBLE | {"type": "DOUBLE", "nullable": true/false} |
DATE | {"type": "DATE", "nullable": true/false} |
TIME | {"type": "TIME", "nullable": true/false, "precision": <PRECISION>} |
TIMESTAMP | {"type": "TIMESTAMP", "nullable": true/false, "precision": <PRECISION>} |
TIMESTAMP_LTZ | {"type": "TIMESTAMP_LTZ", "nullable": true/false, "precision": <PRECISION>} |
RAW | {"type": "RAW", "nullable": true/false, "class": <CLASS>, "specialSerializer": <SERIALIZER>} |
- Collecction Type
For the collection type, it recursively serialize the element type.
Type Name | JSON |
---|---|
MAP | {"type": "MAP", "nullable": true/false, "keyType": ..., "valueType": ...} |
ARRAY | {"type": "ARRAY", "nullable": true/false, "elementType": ...} |
MULTISET | {"type": "MULTISET", "nullable": true/false, "elementType": ...} |
ROW | { "type": "ROW", "nullable": true/false, "fields": [ { "name": <FILED_NAME>, "fieldType": ... },... ] |
- User-defined type
Don't support serializing the user defined type. Because the user can't create a new type with SQL.
Serialize the RowData
The RowData contains the two parts: RowKind and Fields.
Therefore, we propose to serialize the RowData as follows.
Code Block | ||
---|---|---|
| ||
{
"kind": "INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE",
"fields": [<COLUMN_VALUE>, ...]
} |
The `<COLUMN_VALUE>` is as same as using the `JSON_STRING` to serialize the value.
The type mapping from Flink type to JSON type is as follows.
Flink Type | JSON Type |
---|---|
CHAR / VARCHAR / STRING | string |
BOOLEAN | boolean |
BINARY / VARBINARY | string with encoding: base64 |
DECIMAL | number |
TINYINT | number |
SMALLINT | number |
INT | number |
BIGINT | number |
FLOAT | number |
DOUBLE | number |
DATE | string with format: date |
TIME | string with format: time |
TIMESTAMP | string with format: date-time |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | string with format: date-time (with UTC time zone) |
INTERVAL | number |
ARRAY | array |
MAP / MULTISET | object |
ROW | object |
RAW | string with encoding: base64 |
Serialize the Exception
Considering some clients may also care about the root cause, the serialized json object should contain the root cause and the stack.
Code Block | ||
---|---|---|
| ||
"exception": {
"root_cause": "...",
"exception_stack": "..."
} |
Example
Code Block | ||
---|---|---|
| ||
{
"result_type": "PAYLOAD",
"results": {
"columns": [
{
"name": "id",
"type": {"type": "BIGINT", "nullable": false}
},
{
"name": "name",
"type": {"type": "VARCHAR", "nullable": true, "length": 300}
},
{
"name": "birthday",
"type": {"type": "TIMESTAMP", "nullable": true, "precision": 3}
}
],
"data": [
{
"kind": "INSERT",
"fields": [101, "Jay", "1990-01-12T12:00.12"], # a row data
},
{
"kind": "DELETE",
"fields": [102, "Jimmy", null]
}
]
},
"next_result_uri": /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} # if not empty, uses this uri to fetch next part of result, else there is no more result.
"exception": {
"root_cause": "....",
"exception_stack": "..."
}
} |
CloseSession
...
/v1/sessions/:session_handle
...
Verb: DELETE
...
Response code: 200 OK
...
Close a session, release related resources including operations and properties
...
Request body
...
{}
...
Response body
...
{
"status": "CLOSED" # if cancel successfully
}
Get Session Config
...
/v1/sessions/:session_handle
...
Verb: GET
...
Response code: 200 OK
...
Get the session config with the specified session handle.
...
Request body
...
{}
...
Response body
...
{
"properties": {
"key": "value"
}
}
Configure Session
...
/v1/sessions/:session_handle/configure_session
...
Verb: POST
...
Response code: 200 OK
Configures the session a statement which could be SET/RESET/CREATE/ DROP/LOAD/UNLOAD/USE/ALTER/ADD JAR/REMOVE JAR
The statement must be a single command, otherwise the server will throw an exception.
...
Request body
...
{
"statement": "", # required
"execution_timeout": "" # execution time limit in milliseconds, optional
}
...
Response body
...
{}
Operation-related
Get Operation Status
...
/v1/sessions/:session_id/operations/:operation_handle/status
...
Verb: GET
...
Response code: 200 OK
...
Get the status of a running job.
If the session is expired, the server will throw "session not found" exception.
If the job is finished, the server will throw "job not found" exception.
...
Request body
...
{}
...
Response body
...
{
"status": "" # refer to OperationStatus
}
...
/v1/sessions/:session_handle/operations/:operation_handle/cancel
...
Verb: PUT
...
Response code: 200 OK
...
Cancel the running operation and update the opeartion status.
...
Request body
...
{}
...
Response body
...
{
"status": "CANCELED" # if cancel successfully
}
Close Operation
...
/v1/sessions/:session_handle/operations/:operation_handle
...
Verb: DELETE
...
Response code: 200 OK
...
Remove the specified Operation
.
If the user invokes closeOperation twice, the later invocation will get exception.
...
Request body
...
{}
...
Response body
...
{
"status": "CLOSED" # if close successfully
}
Statement-related
Execute a statement
...
/v1/sessions/:session_handle/statements
...
Verb: POST
...
Response code: 200 OK
Execute a statement which could be all Flink supports SQL statement.
The SET xx=yy statement will override/update the TableConfig held by current session, and the RESET statement will reset all properties set by SET xx=yy statement.
The USE MODULE/CATALOG/DATABASE xx statement will update the default module/catalog/database in TableEnvironment held by current session.
The statement must be a single command, otherwise the server will throw an exception.
...
Request body
...
{
"statement": "", # required
"execution_timeout": "" # execution time limit in milliseconds, optional
}
...
Response body
...
{
"operation_handle": "",
"operation_type": "",
"has_result": true/false # determine whether needs to fetch results later
}
Fetch result
...
/v1/sessions/:session_handle/operations/:operation_handle/result/:token
...
Verb: GET
...
Response code: 200 OK
Fetch a part of result for a flink job execution. If the result data is too large or the result is streaming, we can use this API to get a part of the result at a time. The initialized value of token is 0. The token in the next request must be the same as the token in the current request or must be equal to token (in the current request) + 1, otherwise the client will get an exception from the server. If multiple requests are executed with the same token, the result is the same. This design makes sure the client could get the result even if some errors occur in client. The client can get the next part of result using /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} (which is the value of next_result_uri in the response data). If next_result_uri is empty, no more data is remaining in the server.
The server could drop the old data before current token. (The client successfully obtains those data)
We will introduce fetch_size or max_wait_time (to reach the fetch_size) for optimization in future.
The returned result has the same schema as the TableResult#getResolvedSchema.
...
Request body
...
{}
...
Response body
...
{
"result_type": "PAYLOAD",
"results": [ # currently, there is only one result now. If multiple queries is executed in a single job, there are many results.
{
"columns": [ # if the execution is successful
{
"name": "",
"type": # string value of LogicalType
},
],
"data": [
["value", ], # a row data
]
},
],
"next_result_uri": /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} # if not empty, uses this uri to fetch next part of result, else there is no more result.
"exception": "exception stack"
}
Util
Trigger session heartbeat
...
/version/sessions/:session_handle/
...
Verb: POST
...
Response code: 200 OK
...
Trigger heartbeat to tell the server that the client is active, and to keep the session alive as long as configured timeout value.
If a session does not receive a heartbeat or any other operations, the session will be destroyed when the timeout is reached.
...
Request body
...
{}
...
Response body
...
{}
Get info
...
/v1/info
...
Verb: GET
...
Response code: 200 OK
...
Get meta data for this cluster
...
Request body
...
{}
...
Response body
...
{
"product_name": "Apache Flink",
"version": "1.16" # Flink version
}
Statement Completement
...
/v1/sessions/:session_handle/complete_statement
...
Verb: GET
...
Response code: 200 OK
...
Complete the statements.
...
Request body
...
{
"statement": "",
"postion":
}
...
Response body
...
{
"candidates": []
}
Options
...
Option name
...
Default Value(Required)
...
Description
...
endpoint.protocol
...
rest (Yes)
...
REST endpoint should use 'rest'.
...
endpoint.rest.port
...
8083(No)
...
REST endpoint port.
...
endpoint.rest.address
...
127.0.0.1 (No)
...
The address that the SqlServer binds itself.
SQL Client
SQL Client has different mode. To reuse the code, architecture of the SQL Client in the embedded mode as follows.
In the Gateway mode, the SQL Client uses the Rest Client to communicate with the GatewayService.
Actually the architecture in the different mode are almost the same. The only difference is how to communicate with the GatewayService. Therefore, we focus on the Gateway mode in this section. The process logic in the Gateway mode should also works for the embeded mode.
The SQL Client is composed of the CliClient
and Executor
.
- The
CliClient
is responsible to receives the statement from the terminal and print the results; - The
Exectuor
is responsible to execute the statement from theCliClient
and return the results.
- It has a client-level parser, which determines the statement whether is the client-level command, e.g. HELP, QUIT.
- It can submit the statement to the GatewayService with the REST Client.
Gateway Mode
Launch the client
When lanuch the SQL Client in the gateway mode, user should specify the address and port of the Gateway they want to commuincate. Therefore, we should expose the following startup options.
...
Parameter
...
Requried
...
Description
...
-h, --host
...
Yes
...
The gateway address
...
-p, --port
...
Yes
...
The gatewary port
User can start up the SQL Client with the command in the bash.
Code Block | ||
---|---|---|
| ||
./sql-client.sh gateway -h 127.0.0.1 -p 9092 |
SQL Client initialization
During the initialization, SQL Client is responsible to register the Session
in the GatewayService
:
- add the jar into the
Session
's classpath if launch the client with the -j parameter; - add the library into the
Session'
s classpath if launch the client with the -l parameter; - execute the statements in the init file if launch the client with -i parameter
The REST API has already exposed the parameters to allow users to specify the path of the jars and libraries in the OpenSession request. Considering the execute statements in the init file is much similar to the execute the statement in the interactive mode, we discuss the execution in the init mode in the next section.
Execute statement
Currently the GatewayService exposes the configureSession
, which is similar to the -i. When SQL Client executes files or in interactive mode, the Executor
should use the GatewayService#executeStatement
, which doesn't limit the statement type. Therefore, the Executor
should invok the different APIs to execute statements in the different mode.
Execute the initialization file
SQL Client supports to split the content in the file with ";" . After the split, the CliClient
will execute the statments one by one. The UML Sequence Diagram is as follows.
PlantUML |
---|
@startuml
autonumber
actor "User" as User
participant "CliClient" as CliClient
participant "RestClient" as RestClient
participant "Gateway" as Gateway #orange
activate Gateway
activate User
User -> CliClient: init session with file
activate CliClient
activate RestClient
CliClient -> RestClient: openSession
RestClient -> Gateway: openSession
Gateway --> RestClient: resp with SessionHandle
RestClient -> CliClient: SessionHandle
CliClient -> CliClient: split statement
loop until all statements are executed
CliClient -> CliClient: check statment is client-level command
CliClient -> RestClient: execute in INITIALIZATION mode
RestClient -> Gateway: configure Session with statement
Gateway --> RestClient: resp
RestClient -> CliClient: succeed to execute the statement
CliClient --> User: print info
end
CliClient -> User: enter interactive mode
@enduml |
The change that differs with before is that we can't print the detailed info message. Currently the response of the executeStatement only can tell us whether the statement is query or non-query. For non-query statement, the SQL Client will print the results in the tableau mode or the info.
...
Execute in interactive mode
We only discuss execution of the DQL and DML here. Other cases should be the same.
Execute Query
Currently SQL Client supports to print the results in the table(materialized view)/tableau/changelog style. Considering the Gateway doesn't materialize the results , the client should fetch the results from the Gateway and materialize the results in the client side. The UML Sequence Diagram of the execution query is as follows.
PlantUML |
---|
@startuml
autonumber
actor "User" as User
participant "CliClient" as CliClient
participant "RestClient" as RestClient
participant "Gateway" as Gateway #orange
activate Gateway
activate User
User -> CliClient: execute query statement
activate CliClient
activate RestClient
CliClient -> CliClient: check whether statment is client-level statement
CliClient -> RestClient: execute statement in EXECUTION_MODE
RestClient -> Gateway: executeStatement
Gateway --> RestClient: resp with OperationHandle(type: EXECUTE_QUERY_STATEMENT)
RestClient -> CliClient:
CliClient -> RestClient: fetch session config
RestClient -> Gateway: getSessionConfig
Gateway --> RestClient: resp with config
RestClient --> CliClient: config
CliClient -> CliClient: get result mode
loop until all resuls are fetched
CliClient -> RestClient: fetch results
RestClient -> Gateway: fetchResults
Gateway --> RestClient: resp with results and token
RestClient --> CliClient: results
CliClient --> User: print results to terminal
end
RestClient -> Gateway: close operation
Gateway --> RestClient: resp
@enduml |
Execute insert statement in sync mode
Currently SQL Client supports to execute DML in sync mode by
SET 'table.dml-sync'='true';
In the sync mode, the Operation
should also also convert the JobStatus
to the OperationStatus
when the job is submitted. Therefore, only the JobStatus
comes to finished state, the Operation
comes to finished state.
Implementation
- Introduce the package flink-table/flink-sql-gateway-common, which includes the gateway API. When users wants to implement its own endpoint, they only needs to rely on this.
- Introduce the package flink-table/flink-sql-gateway. It includes rest endpoint and SQL GatewayService.
- flink-table/flink-sql-client should relies on the the package flink-table/flink-sql-gateway.
- We leave the MetricSystem as future improvements.
Compatibility, Deprecation, and Migration Plan
...
The design of the origin Gateway is in the
...