You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »


Status

Current state"Under Discussion"

Discussion threadhttp://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCADQYLGsCGDJkfd3L1hAy1y_M2625YkNHJGW82UraGLhzg6p7Ug%40mail.gmail.com%3E

JIRAhere (<- link to  Unable to render Jira issues macro, execution error. )

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Introduction

The whole conception and architecture of SQL Client are proposed in FLIP-24 which mainly focuses on embedded mode. The goal of this FLIP is to extend FLIP-24 to support gateway mode and expose the Gateway with pluggable endpoints. The reason why we introduce the gateway with pluggable endpoints is that many users has their preferences. For example, the HiveServer2 users prefer to use the gateway with HiveServer2-style API, which has numerous tools. However, some filnk-native users may prefer to use the REST API. Therefore, we hope to learn from the Kyuubi's design that expose multiple endpoints with different API that allow the user to use.


Section 2 introduces the main concepts in the Flink SQL Gateway. Section 3 proposes the architecture of the SQL Gateway. Section 4 introduces the component API and the REST endpoint. Section 5 introduces how the Flink SQL Client connects to the Flink SQL Gateway.


Core Concepts

Like many big data platforms, Flink SQL Gateway also has the following concepts.

Session

Session represents the users who visit the Gateway in the peiord. Flink SQL Gateway uses the SessionHandle as the index to identify the Session. In addition to uniquely identifying the user being accessed, it also acts as an isolation of resources, including jar resources, configuration information and meta information.

Operation

Every user request is transformed to Operation.

  • User uses the OperationHandle to identify the user request.
  • User can send the request to the Gateway to manage the Operation:
    • Get OperationStatus that describes whether the Operation is running or meets error.
    • Cancel the running Operation:
    • Close the finished Operation;

SessionManager

SessionManager is responsible to manage the Session, including:

  • Every user should register a Session before sending its request to the Gateway;
  • Every user should close its corresponding Session when it exists. If a Session is inactive for a period of time, the SessionManager will clean up the Session automatically;
  • SessionManager should limit the number of the active Session to avoid OOM;

OperationManager

OperationManager is responsible to manage the Operation . The Operation's initialization, execution and clean up should be controlled by the OperationManager. When the Session closes, the OperationManager also needs to clean up all alive Operation.


We organize all the concepts in the following graph.

Architecture

The architecture of the Gateway is in the following graph.

Flink SQL Gateway is composed of the Endpoints, GatewayService and MetricSystem.

  • GatewayService: It's responsible to manage the active Sessions and submitted Operations. When user tries to get the results, the GatewayService should return the corresponding results.
    • GatewayService exposes API about the management of the Sessionand Operationand utils. The Endpoints can combine these methods to satisfy its requirements;
    • GatewayService is responsible to expose the registry of the Session;
    • GatewayService executes the Operation async: when submit the operation to the OperationManager, return the OperationHandle ;
  • Endpoint: It's the main entry point for the users to visit.
    • Considering the different needs of the different users, we perfer the pluggable endpoint API and user can extends with different mode, e.g. HivServer2, PrestoCoordinator. Referring to the implementation of connector in Flink, we prefer to use a loading mechanism similar to SPI to dynamically load the corresponding Endpoint according to the user's configuration items.


    • In many cases, user Client's version is different from the Endpoint's version. Therefore, we need to confirm the version of the communication during the open session. It determines the interface how the returned results are serialized in the server side. Considering that different endpoints have their own version management, we propose to let each Endpoint manage its own Endpoint version. Every time the Endpoint needs to return the result to the Client, it serialize the results according to the version determined during the session registeration.


    • Different Endpoints expose different execution modes, e.g. HiveServer2 supports asynchronous/synchronous job submission (DML, DQL), while origin REST API currently only supports asynchronous job submission. In order to simplify the corresponding execution mode, we only support asynchronous submission in GatewayService, and Endpoint controls the corresponding execution mode. In the synchronous execution mode, the Endpoint monitors the status of the Operation, and returns the corresponding result to the user after the job is completed.


    • Operation state machine:
      • Considering all Operation has its status, we propose the state machine like HiveServer2:
        • INITIALIZED: Operation is created ;
        • PENDING: Status during the status switch;
        • RUNNING: Operation starts running;
        • FINISHED: Operation finishes;
        • CANCELED: User cancels the Operation;
        • CLOSED: User closes the Operation;
        • ERROR: Operation execution meet errors;
        • TIMEDOUT:Execution timeout.
      • For DQL/DML,we need to convert the JobStatusto the corresponding OperationStatusin the synchronous mode(In asynchronous mode, the operation finishes when submit the job). We have the following mapping with Flink's state machine.
        • CREATED -> INITIALIZED
        • FAILED -> ERROR
        • CANCElED -> CANCELED
        • FINISHED -> FINISHED
        • RUNNING -> RUNNING
        • INITIALIZING/FAILING/CANCELLING/RESTARTING/SUSPENDED/RECONCILING -> PENDING
      • The state machine as follows


  • MetricSystem: it's responsible to report the metric to the specified destination.
    • Gateway is the main entry for the user to submit SQL jobs. In many cases, some metrics are needed to measure the state of the entire system, so as to locate some problems. Or use indicators to facilitate the management of the gateway by the peripheral system, e.g. load balancing.


Component API

Handle

public class HandleIdentifier {
    UUID publicId;
    UUID secretId;
}

public class SessionHandle {
    HandleIdentifier identifier;
}

/**
 * Every Endpoint should expose its version and extend the interface.
 */
interface ProtocolVersion {}

enum RestEndpointVersion implements ProtocolVersion {
    V1;
}

/**
 * It's equal to the HiveServer2 TProtocolVersion. It should belong to the
 * hive module.
 */
enum HiveServer2ProtocolVersion implements ProtocolVersion {
  HIVE_CLI_SERVICE_PROTOCOL_V1,

  // V2 adds support for asynchronous execution
  HIVE_CLI_SERVICE_PROTOCOL_V2

  // V3 add varchar type, primitive type qualifiers
  HIVE_CLI_SERVICE_PROTOCOL_V3

  // V4 add decimal precision/scale, char type
  HIVE_CLI_SERVICE_PROTOCOL_V4

  // V5 adds error details when GetOperationStatus returns in error state
  HIVE_CLI_SERVICE_PROTOCOL_V5

  // V6 uses binary type for binary payload (was string) and uses columnar result set
  HIVE_CLI_SERVICE_PROTOCOL_V6

  // V7 adds support for delegation token based connection
  HIVE_CLI_SERVICE_PROTOCOL_V7

  // V8 adds support for interval types
  HIVE_CLI_SERVICE_PROTOCOL_V8

  // V9 adds support for serializing ResultSets in SerDe
  HIVE_CLI_SERVICE_PROTOCOL_V9

  // V10 adds support for in place updates via GetOperationStatus
  HIVE_CLI_SERVICE_PROTOCOL_V10

  // V11 adds timestamp with local time zone type
  HIVE_CLI_SERVICE_PROTOCOL_V11

}

enum OperationType { 
  EXECUTE_QUERY_STATEMENT,
  EXECUTE_NON_QUERY_STATEMENT 
  GET_INFO,
  UNKNOWN;
}


public class OperationHandle {
    HandleIdentifier identifier;
}


GatewayService

API

interface SQLGatewayService {
        
    // -------------------------------------------------------------------------------------------
    // Session Management
    // -------------------------------------------------------------------------------------------

    SessionHandle openSession(SessionEnvironment environment) throws SQLGatewayException;
    
    void closeSession(SessionHandle sessionHandle);
    
    Map<String, String> getSessionConfig(SessionHandle sessionHandle);
    
    // -------------------------------------------------------------------------------------------
    // Operation Management
    // -------------------------------------------------------------------------------------------
        
    /**
     * Get operation info to describe the Operation.
     */
    OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle);
    
    /** Get the result schema for the specified Operation. */
    ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle oprationHandle);
    
    void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle);
    
    void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle);
    
    // -------------------------------------------------------------------------------------------
    // Statements 
    // -------------------------------------------------------------------------------------------
    
    /** 
     * Using the statement to initialize the Session. It's only allowed to 
     * execute SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR/REMOVE JAR.
     */
    void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs);
    
    /** Execute the statement with the specified Session. It allows to execute with Operation-level configuration.*/
    OperationHandle executeStatement(
        SessionHandle sessionHandle, 
        String statement, 
        long executionTimeoutMs, 
        Configuration executionConfig);
   
    ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, int token, int maxRows);
    
    /**
     * For the same functionality, every endpoint has its result schema. Therefore, 
     * the endpoint submit the callable executor to the OperationManager that manages
     * lifecycle of the Operaiton. The callable executor organizes the results
     * as the Endpoint requires.
     */
    OperationHandle submitOperation(OperationType type, Callable<ResultSet> executor, ResolvedSchema resultSchema);
    
    // -------------------------------------------------------------------------------------------
    // Utils 
    // -------------------------------------------------------------------------------------------
    
    /**
     * Describe the cluster info.
     */
    Map<String, String> getGatewayInfo();
    
    void heartbeat(SessionHandle sessionHandle);
    
    /**
     * Endpoint is status-less. All the session configs are memorized in the GatewayService side.
     */
    EndpointVersion getSessionProtocolVersion(SessionHandle sessionHandle);
    
    /** Returns a list of completion hints for the given statement at the given position. */
    List<String> completeStatement(String sessionId, String statement, int position);

   
    // -------------------------------------------------------------------------------------------
    // Catalog API 
    // -------------------------------------------------------------------------------------------
    
    String getCurrentCatalog(SessionHandle);

    String getCurrentDatabase(SessionHandle);
    
    List<String> listCatalogs();
    
    List<String> listDatabases(String catalogName);   
    
    List<TableDescriptor> listTables(String catalogName, String databaseName, TableKind tableKind);
    
    List<FunctionDescriptor> listUserDefinedFunctions(String catalogName, String databaseName);
    
    ContextResolvedTable getTable(ObjectIdentifier tableIdentifier);
    
    ContextResolvedTable getFunction(ObjectIdentifier functionIdentifier);
    
}

class TableDescriptor {
    boolean isTemporary;
    ObjectIdentifier identifier;
    TableKind tableKind;
}

class UserDefinedFunctionDescriptor {
    boolean isTemporary;
    ObjectIdentifier identifier;
    FunctionKind kind;
    String className;
    String description;
}

enum TableKind {
    ALL,
    VIEW,
    TABLE
}

class SessionEnvironment {
    private String sessionName;
    private ProtocolVersion version;
    private List<URL> libs;
    private List<URL> jars;
    private Map<String, String> sessionConfig
}

public class OperationInfo {
    OperationStatus status;
    OperationType type;
    boolean hasResult;
}

public class ResultSet {
    
    int nextToken;
    
    ResultType resultType; 

    ResolvedSchema resultSchema;
    
    List<RowData> results;
    
    Exception exception;
}

public enum ResultType {
    PAYLOAD,
    EMPTY,
    EOS,
    ERROR
}

Option

Option

Required

Default value

Description

sql-gateway.session.idle.timeout


No

5 min

Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero or negative value.


sql-gateway.session.check.interval


No

1 min

The check interval for session timeout, which can be disabled by setting to zero or negative value.


sql-gateway.session.max-num


No

1000000


The number of the active sessions.

sql-gateway.worker.threads.max


No

500

Maximum number of worker threads for the gateway workers.


sql-gateway.worker.threads.min


No

5

Minimum number of worker threads for the gateway workers.

sql-gateway.worker.keepalive.time


No

5 min

Keepalive time for an idle worker thread. When the number of workers exceeds the min workers, excessive threads are killed after this time interval.


Endpoint

We use the SPI mechanism to discover the Endpoint to discover and build Endpoint.

/** Interface for Endpoint. */
public interface SQLGatewayEndpoint {

    void start() throws Exception;
    
    void stop() throws Exception;
}

/** Factory to create SQLGatewayEndpointService. */
public interface SQLGatewayEndpointFactory extends Factory {

    SQLGatewayEndpoint createSQLGatewayEndpoint(Context context);

    interface Context {
       	
        SQLGatewayService getSQLGatewayService();

        MetricGroup getMetricGroup();

        /** Gives read-only access to the configuration of the Endpoint. */
        ReadableConfig getConfiguration();
    }
}

We also expose the option endpoint.protocol to allow user to specify the endpoints. Considering that the different endpoints may have the same settings, e.g. port, users should add the endpoint identifier as the prefix to specify the option, e.g.rest.port. For simplicity, we don't plan to introduce another yaml for SQL Gateway and users can specify the gateway options in the flink-conf.yaml. For example, users can add the following options in the flink-conf.yaml.

endpoint.protocol: rest, hiveserver2
endpoint.rest.address: localhost
endpoint.rest.port: 9001
endpoint.hiveserver2.address: localhost
endpoint.hiveserver2.port: 9002

REST Endpoint

In this section, we propose to introduce the REST Endpoint, which is different from the REST endpoint we proposed before.

OpenSession

/v1/sessions

Verb: POST

Response code: 200 OK

Create a new session with the specific configuraion.

Request body

{

"session_name": "", # optional

"libs": [], # optional used by sql client

"jars": [], # optional used by sql client

"properties": { # optional, properties for current session

  "key": "value"

}

}

Response body

{

"session_handle": "", # if session is created successfully

}

CloseSession

/v1/sessions/:session_handle

Verb: DELETE

Response code: 200 OK

Close a session, release related resources including operations and properties

Request body

{}

Response body

{

"status": "CLOSED" # if cancel successfully

}

Get Session Config

/v1/sessions/:session_handle

Verb: GET

Response code: 200 OK

Get the session config with the specified session handle.

Request body

{}

Response body

{

"properties": {

  "key": "value"

}

}


Configure Session

/v1/sessions/:session_handle/configure_session

Verb: POST

Response code: 200 OK

Configures the session a statement which could be SET/RESET/CREATE/ DROP/LOAD/UNLOAD/USE/ALTER/ADD JAR/REMOVE JAR


The statement must be a single command, otherwise the server will throw an exception.


Request body

{

"statement": "", # required

"execution_timeout": "" # execution time limit in milliseconds, optional

}

Response body

{}


Get Operation Status

/v1/sessions/:session_id/operations/:operation_handle/status

Verb: GET

Response code: 200 OK

Get the status of a running job.

If the session is expired, the server will throw "session not found" exception.

If the job is finished, the server will throw "job not found" exception.

Request body

{}

Response body

{

"status": "" # refer to OperationStatus

}


Cancel Operation

/v1/sessions/:session_handle/operations/:operation_handle/cancel

Verb: PUT

Response code: 200 OK

Cancel the running operation and update the opeartion status.

Request body

{}

Response body

{

"status": "CANCELED" # if cancel successfully

}


Close Operation

/v1/sessions/:session_handle/operations/:operation_handle

Verb: DELETE

Response code: 200 OK

Remove the specified Operation.

If the user invokes closeOperation twice, the later invocation will get exception.

Request body

{}

Response body

{

"status": "CLOSED" # if close successfully

}


Execute a statement

/v1/sessions/:session_handle/statements

Verb: POST

Response code: 200 OK

Execute a statement which could be all Flink supports SQL statement.


The SET xx=yy statement will override/update the TableConfig held by current session, and the RESET statement will reset all properties set by SET xx=yy statement.


The USE MODULE/CATALOG/DATABASE xx statement will update the default module/catalog/database in TableEnvironment held by current session.


The statement must be a single command, otherwise the server will throw an exception.


Request body

{

"statement": "", # required

"execution_timeout": "" # execution time limit in milliseconds, optional

}

Response body

{

"operation_handle": "",

"operation_type": "",

"has_result": true/false # determine whether needs to fetch results later

}


Fetch result

/v1/sessions/:session_handle/operations/:operation_handle/result/:token

Verb: GET

Response code: 200 OK

Fetch a part of result for a flink job execution. If the result data is too large or the result is streaming, we can use this API to get a part of the result at a time. The initialized value of token is 0. The token in the next request must be the same as the token in the current request or must be equal to token (in the current request) + 1, otherwise the client will get an exception from the server. If multiple requests are executed with the same token, the result is the same. This design makes sure the client could get the result even if some errors occur in client. The client can get the next part of result using /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} (which is the value of next_result_uri in the response data). If next_result_uri is empty, no more data is remaining in the server.

The server could drop the old data before current token. (The client successfully obtains those data)

We will introduce fetch_size or max_wait_time (to reach the fetch_size) for optimization in future.


The returned result has the same schema as the TableResult#getResolvedSchema.

Request body

{}

Response body

{

"result_type": "PAYLOAD",

"results": [ # currently, there is only one result now. If multiple queries is executed in a single job, there are many results.

{

"columns": [ # if the execution is successful

{

"name": "",

"type": # string value of LogicalType

},

],

"data": [

["value", ], # a row data

]

},

],

"next_result_uri": /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} # if not empty, uses this uri to fetch next part of result, else there is no more result.

"exception": "exception stack"

}



Util

Trigger Session Heartbeat

/version/sessions/:session_handle/

Verb: POST

Response code: 200 OK

Trigger heartbeat to tell the server that the client is active, and to keep the session alive as long as configured timeout value.

If a session does not receive a heartbeat or any other operations, the session will be destroyed when the timeout is reached.

Request body

{}

Response body

{}

Get Info

/v1/info

Verb: GET

Response code: 200 OK

Get meta data for this cluster

Request body

{}

Response body

{

"product_name": "Apache Flink",

"version": "1.16" # Flink version

}


Statement Completement

/v1/sessions/:session_handle/complete_statement

Verb: GET

Response code: 200 OK

Complete the statements.

Request body

{

"statement": "",

"postion":

}

Response body

{

"candidates": []

}


Options

Option name

Default Value(Required)

Description

endpoint.protocol

rest (Yes)

REST endpoint should use 'rest'.

endpoint.rest.port

8083(No)

REST endpoint port.

endpoint.rest.address

127.0.0.1 (No)

The address that the SqlServer binds itself.


SQL Client

SQL Client has different mode. To reuse the code, architecture of the SQL Client in the embedded mode as follows.



In the Gateway mode, the SQL Client uses the Rest Client to communicate with the GatewayService.


Actually the architecture in the different mode are almost the same. The only difference is how to communicate with the GatewayService. Therefore, we focus on the Gateway mode in this section. The process logic in the Gateway mode should also works for the embeded mode.


The SQL Client is composed of the CliClientand Executor.

  • The CliClientis responsible to receives the statement from the terminal and print the results;
  • The Exectuor is responsible to execute the statement from the CliClient and return the results.
    • It has a client-level parser, which determines the statement whether is the client-level command, e.g. HELP, QUIT.
    • It can submit the statement to the GatewayService with the REST Client.


Gateway Mode

Launch the client

When lanuch the SQL Client in the gateway mode, user should specify the address and port of the Gateway they want to commuincate. Therefore, we should expose the following startup options.


Parameter

Requried

Description

-h, --host

Yes

The gateway address

-p, --port

Yes

The gatewary port


User can start up the SQL Client with the command in the bash.


./sql-client.sh gateway -h 127.0.0.1 -p 9092


SQL Client initialization

During the initialization, SQL Client is responsible to register the Session  in the GatewayService:

  • add the jar into the Session's classpath if launch the client with the -j parameter;
  • add the library into the Session's classpath if launch the client with the -l parameter;
  • execute the statements in the init file if launch the client with -i parameter


The REST API has already exposed the parameters to allow users to specify the path of the jars and libraries in the OpenSession request. Considering the execute statements in the init file is much similar to the execute the statement in the interactive mode, we discuss the execution in the init mode in the next section.

Execute statement

Currently the GatewayService exposes the configureSession, which is similar to the -i. When SQL Client executes files or in interactive mode, the Executor should use the GatewayService#executeStatement, which doesn't limit the statement type. Therefore, the Executor should invok the different APIs to execute statements in the different mode.


Execute the initialization file

SQL Client supports to split the content in the file with ";" . After the split, the CliClient will execute the statments one by one. The UML Sequence Diagram is as follows. User User CliClient CliClient RestClient RestClient Gateway Gateway 1init session with file 2openSession 3openSession 4resp with SessionHandle 5SessionHandle 6split statement loop[until all statements are executed] 7check statment is client-level command 8execute in INITIALIZATION mode 9configure Session with statement 10resp 11succeed to execute the statement 12print info 13enter interactive mode


The change that differs with before is that we can't print the detailed info message. Currently the response of the executeStatement only can tell us whether the statement is query or non-query. For non-query statement, the SQL Client will print the results in the tableau mode or the info.

[INFO] Execute statement succeed.

Execute in interactive mode

We only discuss execution of the DQL and DML here. Other cases should be the same.

Execute Query

Currently SQL Client supports to print the results in the table(materialized view)/tableau/changelog style. Considering the Gateway doesn't materialize the results , the client should fetch the results from the Gateway and materialize the results in the client side. The UML Sequence Diagram of the execution query is as follows.


User User CliClient CliClient RestClient RestClient Gateway Gateway 1execute query statement 2check whether statment is client-level statement 3execute statement in EXECUTION_MODE 4executeStatement 5resp with OperationHandle(type: EXECUTE_QUERY_STATEMENT) 6  7fetch session config 8getSessionConfig 9resp with config 10config 11get result mode loop[until all resuls are fetched] 12fetch results 13fetchResults 14resp with results and token 15results 16print results to terminal 17close operation 18resp

Execute insert statement in sync mode

Currently SQL Client supports to execute DML in sync mode by

SET 'table.dml-sync'='true';

In the sync mode, the Operation should also also convert the JobStatusto the OperationStatus when the job is submitted. Therefore, only the JobStatus comes to finished state, the Operation comes to finished state.


Implementation

  • Introduce the package flink-table/flink-sql-gateway-common, which includes the gateway API. When users wants to implement its own endpoint, they only needs to rely on this.
  • Introduce the package flink-table/flink-sql-gateway. It includes rest endpoint and SQL GatewayService.
  • flink-table/flink-sql-client should relies on the the package flink-table/flink-sql-gateway.
  • We leave the MetricSystem as future improvements.


Compatibility, Deprecation, and Migration Plan

Because we introduced new modules, there are no compatibility issues

The design of the origin Gateway is in the 

https://docs.google.com/document/d/1DKpFdov1o_ObvrCmU-5xi-VrT6nR2gxq-BbswSSI9j8


  • No labels