...
- GatewayService exposes API about the management of the
Session
andSession,
Operation
and utils. The Endpoints can combine these methods to satisfy its requirements; - GatewayService is responsible to expose the registry of the Session;
- .
- GatewayService GatewayService executes the Operation async: when submit the operation to the OperationManager, return the OperationHandle. It doesn't wait the Operation execution finishes ;
- GatewayService is shared between all the loaded endpoints.
- Endpoint: It's the main entry point for the users to visit.
...
Option name | Default Value(Required) | Description |
endpoint.type | rest (Yes) | REST endpoint should use 'rest'. |
endpoint.rest.port | 8083(No) | REST endpoint port. |
endpoint.rest.address | 127.0.0.1 (No) | The address that the SqlServer binds itself. |
GatewayService API
Object API
...
Pluggable Endpoint Discovery
The pluggable endpoint discovery will load the endpoint dynamically. It enables the users to load the endpoint they want to use. Here We use the SPI mechanism to discover the Endpoint.
Code Block | ||
---|---|---|
| ||
public class HandleIdentifier { UUID publicId; UUID secretId; } public class SessionHandle { HandleIdentifier identifier/** Interface for Endpoint. */ public interface SQLGatewayEndpoint { void start() throws Exception; void stop() throws Exception; } /** *Factory Everyto Endpoint should expose its version and extend the interface. create SQLGatewayEndpointService. */ public interface EndpointVersionSQLGatewayEndpointFactory extends Factory {} enum RestEndpointVersion implements EndpointVersion { SQLGatewayEndpoint V1createSQLGatewayEndpoint(Context context); } /** * It's equalinterface toContext the{ HiveServer2 TProtocolVersion. It should belong to the * hive module. */ enum HiveServer2EndpointVersion implements EndpointVersion { HIVE_CLI_SERVICE_PROTOCOL_V1,SQLGatewayService getSQLGatewayService(); // V2 adds support for asynchronous execution HIVE_CLI_SERVICE_PROTOCOL_V2MetricGroup getMetricGroup(); // V3 add varchar type, primitive type qualifiers /** Gives read-only access to the configuration of the Endpoint. */ ReadableConfig getConfiguration(); } } |
We also expose the option endpoint.type
to allow user to specify the endpoints. Considering that the different endpoints may have the same settings, e.g. port, users should add the endpoint identifier as the prefix to specify the option, e.g.rest.port
. For simplicity, we don't plan to introduce another yaml for SQL Gateway and users can specify the gateway options in the flink-conf.yaml.
For example, users can add the following options in the flink-conf.yaml.
Code Block | ||
---|---|---|
| ||
endpoint.type: rest, hiveserver2
endpoint.rest.address: localhost
endpoint.rest.port: 9001
endpoint.hiveserver2.address: localhost
endpoint.hiveserver2.port: 9002 |
GatewayService API
Object API
The API is used to describe the Session and Operation.
Code Block | ||
---|---|---|
| ||
public class HandleIdentifier { UUID publicId; UUID secretId; } public class SessionHandle { HandleIdentifier identifier; } /** * Every Endpoint should expose its version and extend the interface. */ interface EndpointVersion {} enum RestEndpointVersion implements EndpointVersion { V1; } /** * It's equal to the HiveServer2 TProtocolVersion. It should belong to the * hive module. */ enum HiveServer2EndpointVersion implements EndpointVersion {HIVE_CLI_SERVICE_PROTOCOL_V3 // V4 add decimal precision/scale, char type HIVE_CLI_SERVICE_PROTOCOL_V4 // V5 adds error details when GetOperationStatus returns in error state HIVE_CLI_SERVICE_PROTOCOL_V5 // V6 uses binary type for binary payload (was string) and uses columnar result set HIVE_CLI_SERVICE_PROTOCOL_V6 // V7 adds support for delegation token based connection HIVE_CLI_SERVICE_PROTOCOL_V7 // V8 adds support for interval types HIVE_CLI_SERVICE_PROTOCOL_V8 // V9 adds support for serializing ResultSets in SerDe HIVE_CLI_SERVICE_PROTOCOL_V9 // V10 adds support for in place updates via GetOperationStatus HIVE_CLI_SERVICE_PROTOCOL_V10 // V11 adds timestamp with local time zone type HIVE_CLI_SERVICE_PROTOCOL_V11 }V1, enum OperationType {// V2 adds EXECUTE_STATEMENT, GET_INFO, UNKNOWN; } public class OperationHandle { HandleIdentifier identifier; } |
SQLGatewayService API
support for asynchronous execution
HIVE_CLI_SERVICE_PROTOCOL_V2
// V3 add varchar type, primitive type qualifiers
HIVE_CLI_SERVICE_PROTOCOL_V3
// V4 add decimal precision/scale, char type
HIVE_CLI_SERVICE_PROTOCOL_V4
// V5 adds error details when GetOperationStatus returns in error state
HIVE_CLI_SERVICE_PROTOCOL_V5
// V6 uses binary type for binary payload (was string) and uses columnar result set
HIVE_CLI_SERVICE_PROTOCOL_V6
// V7 adds support for delegation token based connection
HIVE_CLI_SERVICE_PROTOCOL_V7
// V8 adds support for interval types
HIVE_CLI_SERVICE_PROTOCOL_V8
// V9 adds support for serializing ResultSets in SerDe
HIVE_CLI_SERVICE_PROTOCOL_V9
// V10 adds support for in place updates via GetOperationStatus
HIVE_CLI_SERVICE_PROTOCOL_V10
// V11 adds timestamp with local time zone type
HIVE_CLI_SERVICE_PROTOCOL_V11
}
enum OperationType {
EXECUTE_STATEMENT,
GET_INFO,
UNKNOWN;
}
public class OperationHandle {
HandleIdentifier identifier;
} |
SQLGatewayService API
Code Block | ||
---|---|---|
| ||
interface SQLGatewayService {
// ----------- | ||
Code Block | ||
| ||
interface SQLGatewayService { // ------------------------------------------------------------------------------------------- // Session Management // ------------------------------------------------------------------------------------------- SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException; void closeSession(SessionHandle sessionHandle) throws SqlGatewayException; Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException; // ------------------------------------------------------------------------------------------- // Operation Management // -------------------------------------------------------------------------------------------------------------- // Session Management /** * Get operation info to describe the Operation. */ OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle/ ------------------------------------------------------------------------------------------- SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException; /** Get the result schema for the specified Operation. */ ResolvedSchema getOperationResultSchemavoid closeSession(SessionHandle sessionHandle, OperationHandle opreationHandle) throws SqlGatewayException; Map<String, voidString> cancelOperationgetSessionConfig(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException; void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException; //// ------------------------------------------------------------------------------------------- // StatementsOperation Management // ------------------------------------------------------------------------------------------- /** * UsingGet theoperation statementinfo to initializedescribe the SessionOperation. It's only allowed to */ OperationInfo * execute SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR/REMOVE JAR.getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException; /** Get the result schema for the specified Operation. */ voidResolvedSchema configureSessiongetOperationResultSchema(SessionHandle sessionHandle, String statement, long executionTimeoutMsOperationHandle opreationHandle) throws SqlGatewayException; /** Execute the statement with the specified Session. It allows to execute with Operation-level configuration.*/void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException; OperationHandlevoid executeStatementcloseOperation( SessionHandle sessionHandle, SessionHandle sessionHandle, String statement, long executionTimeoutMs, Configuration executionConfigOperationHandle operationHandle) throws SqlGatewayException; /** * Fetch the results with token id. *// ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, int token, int maxRows) throws SqlGatewayException; /** * Fetch the Operation-level log from the GatewayService. For some endpoint, it allows to fetch the log at the operation level. */ ResultSet fetchLog(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException; /** * Only supports to fetch results in FORWARD/BACKWARD orientation. * - Users can only BACKWARD from the current offset once. * - The Gateway doesn't materialize the changelog. ------------------------------------------------------------------------------------------- // Statements // ------------------------------------------------------------------------------------------- /** * Using the statement to initialize the Session. It's only allowed to * execute SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR/REMOVE JAR. */ ResultSetvoid fetchResultconfigureSession(SessionHandle sessionHandle, OperationHandleString operationHandlestatement, FetchOrientation orientation, int maxRowslong executionTimeoutMs) throws SqlGatewayException; /** Execute the statement with *the Forspecified theSession. sameIt functionality,allows everyto endpointexecute haswith its result schema. Therefore, Operation-level configuration.*/ OperationHandle *executeStatement( the endpoint submit the callable executor to the OperationManager that managesSessionHandle sessionHandle, * lifecycle of theString Operaiton.statement, The callable executor organizes the results long executionTimeoutMs, * as the Endpoint requires. Configuration executionConfig) throws SqlGatewayException; /** * Fetch the results with token id. */ OperationHandleResultSet submitOperation(OperationType type, Callable<ResultSet> executor, ResolvedSchema resultSchemafetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, int token, int maxRows) throws SqlGatewayException; /** * Fetch //the ------------------------------------------------------------------------------------------- // Utils // ------------------------------------------------------------------------------------------- /** * Describe the cluster info.Operation-level log from the GatewayService. For some endpoint, it allows to fetch the log at the operation level. */ ResultSet fetchLog(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException; /** * Only supports to fetch results in FORWARD/BACKWARD orientation. * - Users can only BACKWARD from the current offset once. * - The Gateway doesn't materialize the changelog. */ ResultSet fetchResult(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException; /** */ For the same Map<Stringfunctionality, String> getGatewayInfo(); every endpoint has its result schema. Therefore, void heartbeat(SessionHandle sessionHandle) throws SqlGatewayException; /*** the endpoint submit the callable executor to the OperationManager that manages * Endpointlifecycle is status-less. All of the sessionOperaiton. configsThe arecallable memorizedexecutor inorganizes the GatewayService side.results */ as the EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) throws SqlGatewayException; /** Returns a list of completion hints for the given statement at the given position.Endpoint requires. */ List<String>OperationHandle completeStatementsubmitOperation(StringOperationType sessionIdtype, StringCallable<ResultSet> statementexecutor, intResolvedSchema positionresultSchema) throws SqlGatewayException; // ------------------------------------------------------------------------------------------- // CatalogUtils API // ------------------------------------------------------------------------------------------- /** String getCurrentCatalog(SessionHandle) throws SqlGatewayException; * Describe the String getCurrentDatabase(SessionHandle) throws SqlGatewayException; cluster info. */ List<String> listCatalogsMap<String, String> getGatewayInfo() throws SqlGatewayException; List<String>void listDatabasesheartbeat(StringSessionHandle catalogNamesessionHandle) throws SqlGatewayException; /** * List<TableInfo>Endpoint listTables(String catalogName, String databaseName, TableKind tableKind) throws SqlGatewayException; is status-less. All the session configs are memorized in the GatewayService side. List<UserDefinedFunctionInfo> listUserDefinedFunctions(String catalogName, String databaseName*/ EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) throws SqlGatewayException; ContextResolvedTable getTable(ObjectIdentifier tableIdentifier) throws SqlGatewayException; ContextResolvedFunction getFunction(ObjectIdentifier functionIdentifier) throws SqlGatewayException; } class TableInfo { boolean isTemporary; ObjectIdentifier identifier;/** Returns a list of completion hints for the given statement at the given position. */ List<String> completeStatement(String sessionId, String statement, int position) throws SqlGatewayException; TableKind tableKind; } class UserDefinedFunctionInfo { boolean isTemporary; ObjectIdentifier identifier; FunctionKind kind; String className; String description; } enum TableKind { ALL, VIEW, TABLE } class SessionEnvironment { private String sessionName; private EndpointVersion sessionEndpointVersion; private List<URL> libs; private List<URL> jars; private Map<String, String> sessionConfig } public class OperationInfo { OperationStatus status; OperationType type; boolean hasResult; } public class ResultSet { int nextToken// ------------------------------------------------------------------------------------------- // Catalog API // ------------------------------------------------------------------------------------------- String getCurrentCatalog(SessionHandle) throws SqlGatewayException; String getCurrentDatabase(SessionHandle) throws SqlGatewayException; List<String> listCatalogs() throws SqlGatewayException; List<String> listDatabases(String catalogName) throws SqlGatewayException; List<TableInfo> listTables(String catalogName, String databaseName, TableKind tableKind) throws SqlGatewayException; ResultType resultType; ResolvedSchema resultSchemaList<UserDefinedFunctionInfo> listUserDefinedFunctions(String catalogName, String databaseName) throws SqlGatewayException; List<RowData> results ContextResolvedTable getTable(ObjectIdentifier tableIdentifier) throws SqlGatewayException; Exception exception;ContextResolvedFunction getFunction(ObjectIdentifier functionIdentifier) throws SqlGatewayException; } publicclass enum ResultTypeTableInfo { PAYLOAD,boolean isTemporary; EMPTY,ObjectIdentifier identifier; TableKind tableKind; } class UserDefinedFunctionInfo EOS, ERROR } |
Options
Please use the following API to configure the gateway.
...
Option
...
Required
...
Default value
...
Description
sql-gateway.session.idle-timeout
...
No
...
5 min
Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero or negative value.
{
boolean isTemporary;
ObjectIdentifier identifier;
FunctionKind kind;
String className;
String description;
}
enum TableKind {
ALL,
VIEW,
TABLE
}
class SessionEnvironment {
private String sessionName;
private EndpointVersion sessionEndpointVersion;
private List<URL> libs;
private List<URL> jars;
private Map<String, String> sessionConfig
}
public class OperationInfo {
OperationStatus status;
OperationType type;
boolean hasResult;
}
public class ResultSet {
int nextToken;
ResultType resultType;
ResolvedSchema resultSchema;
List<RowData> results;
Exception exception;
}
public enum ResultType {
PAYLOAD,
EMPTY,
EOS,
ERROR
} |
Options
Please use the following API to configure the gateway.
Option | Required | Default value | Description |
---|---|---|---|
sql-gateway.session.idle-timeout | No | 5 min | Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero or negative value. |
sql-gateway.session.check-interval | No | 1 min | The check interval for session timeout, which can be disabled by setting to zero or negative value. |
sql-gateway.session.max-num | No | 1000000 | The number of the active sessions. |
sql-gateway.worker.threads.max | No | 500 | Maximum number of worker threads for the gateway workers. |
sql-gateway.worker.threads.min | No | 5 | Minimum number of worker threads for the gateway workers. |
sql-gateway.worker.keepalive-time | No | 5 min | Keepalive time for an idle worker thread. When the number of workers exceeds the min workers, excessive threads are killed after this time interval. |
sql-gateway.session.check-interval
...
No
...
1 min
The check interval for session timeout, which can be disabled by setting to zero or negative value.
sql-gateway.session.max-num
...
No
1000000
...
The number of the active sessions.
sql-gateway.worker.threads.max
...
No
...
500
Maximum number of worker threads for the gateway workers.
sql-gateway.worker.threads.min
...
No
...
5
...
Minimum number of worker threads for the gateway workers.
sql-gateway.worker.keepalive-time
...
No
...
5 min
Keepalive time for an idle worker thread. When the number of workers exceeds the min workers, excessive threads are killed after this time interval.
Pluggable Endpoint Discovery
We use the SPI mechanism to discover the Endpoint.
Code Block | ||
---|---|---|
| ||
/** Interface for Endpoint. */
public interface SQLGatewayEndpoint {
void start() throws Exception;
void stop() throws Exception;
}
/** Factory to create SQLGatewayEndpointService. */
public interface SQLGatewayEndpointFactory extends Factory {
SQLGatewayEndpoint createSQLGatewayEndpoint(Context context);
interface Context {
SQLGatewayService getSQLGatewayService();
MetricGroup getMetricGroup();
/** Gives read-only access to the configuration of the Endpoint. */
ReadableConfig getConfiguration();
}
} |
We also expose the option endpoint.type
to allow user to specify the endpoints. Considering that the different endpoints may have the same settings, e.g. port, users should add the endpoint identifier as the prefix to specify the option, e.g.rest.port
. For simplicity, we don't plan to introduce another yaml for SQL Gateway and users can specify the gateway options in the flink-conf.yaml.
For example, users can add the following options in the flink-conf.yaml.
...
language | yml |
---|
...
SQL Client API
When lanuch the SQL Client in the gateway mode, user should specify the address and port of the Gateway they want to commuincate. Therefore, we should expose the following startup options.
...