Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

/v1/sessions/:session_handle

Verb: GET

Response code: 200 OK

Get the session config with the specified session handle.

Request body

{}

Response body

{

"properties": {

  "key": "value"

}

}

...


Initialize Session

/v1/sessions/:session_handle/configureinitialize_session

Verb: POST

Response code: 200 OK

Configures the session with the statement which could be SET/RESET/CREATE/ DROP/LOAD/UNLOAD/USE/ALTER/ADD JAR/REMOVE JAR. It can be used to initialize the Session:

  • register Catalog 
  • register user defined function
  • create predefined table

Note: The statement must be a single command, otherwise the server will throw an exception.


Request body

{

"statement": "", # required

"execution_timeout": "" # execution time limit in milliseconds, optional

}

Response body

{}

...

/v1/sessions/:session_handle/operations/:operation_handle/result/:token

Verb: GET

Response code: 200 OK

Fetch a part of result for a flink job execution. If the result data is too large or the result is streaming, we can use this API to get a part of the result at a time. The initialized value of token is 0. The token in the next request must be the same as the token in the current request or must be equal to token (in the current request) + 1, otherwise the client will get an exception from the server. If multiple requests are executed with the same token, the result is the same. This design makes sure the client could get the result even if some errors occur in client. The client can get the next part of result using /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} (which is the value of next_result_uri in the response data). If next_result_uri is empty, no more data is remaining in the server.

The server could drop the old data before current token. (The client successfully obtains those data)

We will introduce fetch_size or max_wait_time (to reach the fetch_size) for optimization in future.


The returned result has the same schema as the TableResult#getResolvedSchema.

Request body

{}

Response body

{

"result_type": "PAYLOAD",

"results": [ # currently, there is only one result now. If multiple queries is executed in a single job, there are many results.

{

"columns": [ # if the execution is successful

{

"name": "",

"type": {"type":  "BOOLEAN", "nullable": true}# string value of LogicalType

},

],

"data": [

["value", ], # a row data

]

},

],

"next_result_uri": /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} # if not empty, uses this uri to fetch next part of result, else there is no more result.

"exception": {

      "root_cause": "....",

     "exception_stack": "..." 

}

}



Statement Completement

/v1/sessions/:session_handle/complete_statement

Verb: GET

Response code: 200 OK

Complete the statements. For example, users input SELE in the terminal and press the tab, the terminal will use the API to complete the statement and return the SELECT.

Request body

{

"statement": "",

"postion":

}

Response body

{

"candidates": []

}

...

Code Block
languagejava
interface SQLGatewayService {
        
    // -------------------------------------------------------------------------------------------
    // Session Management
    // -------------------------------------------------------------------------------------------

    SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
    
    void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
    
    Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException;
    
    // -------------------------------------------------------------------------------------------
    // Operation Management
    // -------------------------------------------------------------------------------------------
        
    /**
     * Get operation info to describe the Operation.
     */
    OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    /** Get the result schema for the specified Operation. */
    ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle opreationHandle) throws SqlGatewayException;
    
    void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    // -------------------------------------------------------------------------------------------
    // Statements 
    // -------------------------------------------------------------------------------------------
    
    /** 
     * Using the statement to initialize the Session. It's only allowed to 
     * execute SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR/REMOVE JAR.
     */
    void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException;
    
    /** Execute the statement with the specified Session. It allows to execute with Operation-level configuration.*/
    OperationHandle executeStatement(
        SessionHandle sessionHandle, 
        String statement, 
        long executionTimeoutMs, 
        Configuration executionConfig) throws SqlGatewayException;
   
	/**
	 * Fetch the results with token id. 
     */
    ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, int token, int maxRows) throws SqlGatewayException;

  	/**
     * Fetch the Operation-level log from the GatewayService. For some endpoint, it allows to fetch the log at the operation level.
     */
    ResultSet fetchLog(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException;
     
   /**
    * Only supports to fetch results in FORWARD/BACKWARD orientation.
    * - Users can only BACKWARD from the current offset once.
    * - The Gateway doesn't materialize the changelog.
    */
    ResultSet fetchResult(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException;          

	/**
     * For the same functionality, every endpoint has its result schema. Therefore, 
     * the endpoint submit the callable executor to the OperationManager that manages
     * lifecycle of the Operaiton. The callable executor organizes the results
     * as the Endpoint requires.
     */
    OperationHandle submitOperation(OperationType type, Callable<ResultSet> executor, ResolvedSchema resultSchema) throws SqlGatewayException;
    
    // -------------------------------------------------------------------------------------------
    // Utils 
    // -------------------------------------------------------------------------------------------
    
    /**
     * Describe the cluster info.
     */
    Map<String, String> getGatewayInfo();
    
    void heartbeat(SessionHandle sessionHandle) throws SqlGatewayException;
    
    /**
     * Endpoint is status-less. All the session configs are memorized in the GatewayService side.
     */     
	EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) throws SqlGatewayException;
    
    /** Returns a list of completion hints for the given statement at the given position. */
    List<String> completeStatement(String sessionId, String statement, int position) throws SqlGatewayException;

   
    // -------------------------------------------------------------------------------------------
    // Catalog API 
    // -------------------------------------------------------------------------------------------
    
    String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException;

    String getCurrentDatabase(SessionHandle sessionHandle) throws SqlGatewayException;
    
    List<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException;
    
    List<String> listDatabases(SessionHandle sessionHandle, String catalogName) throws SqlGatewayException;   
    
    List<TableInfo> listTables(SessionHandle sessionHandle, String catalogName, String databaseName, TableKind tableKind) throws SqlGatewayException;
    
    List<UserDefinedFunctionInfo> listUserDefinedFunctions(SessionHandle sessionHandle, String catalogName, String databaseName) throws SqlGatewayException;
    
    ContextResolvedTable getTable(SessionHandle sessionHandle, ObjectIdentifier tableIdentifier) throws SqlGatewayException;
    
    ContextResolvedFunction getFunction(SessionHandle sessionHandle, ObjectIdentifier functionIdentifier) throws SqlGatewayException;
    
}

class TableInfo {
    boolean isTemporary;
    ObjectIdentifier identifier;
    TableKind tableKind;
}

class UserDefinedFunctionInfo {
    boolean isTemporary;
    ObjectIdentifier identifier;
    FunctionKind kind;
}

class SessionEnvironment {
  String className;
    String description;
}

enum TableKind {
    ALL,
    VIEW,
    TABLE
}

class SessionEnvironment {
    private private String sessionName;
    private EndpointVersion sessionEndpointVersion;
    private List<URL> libs;
    private List<URL> jars;
    private Map<String, String> sessionConfig
}

public class OperationInfo {
    OperationStatus status;
    OperationType type;
    boolean hasResult;
}

public class ResultSet {
    
    int nextToken;
    
    ResultType resultType; 

    ResolvedSchema resultSchema;
    
    List<RowData> results;
    
    Exception exception;
}

public enum ResultType {
    PAYLOAD,
    EMPTY,
    EOS,
    ERROR
}

...

Because we introduce the Gateway modules there are no compatibility problems. For the SQL Client, we use the Gateway to submit jobs. We will keep all the functionality but change the presentation. For example,  when executing explain, the SQL Client will print the results in the now but currently we takes the results from the table now.

Future work

...

The Gateway itself has many functionlities. We only list part of the work here:

  1. Metric System
  2. Authentication module
  3. Multiple Endpoint, e.g. HiveServer2 endpoint
  4. Persistent Gateway
  5. ...

We will collect more feedbacks to determine which features is more important to users.


Rejected Alternatives

TableInfo and UserDefinedFunctionInfo VS CatalogTable and CatalogFunction

The CatalogTable and CatalogFunction are much heavier than the TableInfo and UserDefinedFunction. The CatalogManager requires reading from the Catalog to get the schema. But in the listTables only care about the table name, which is much lighter. Therefore, we propose to use the TableInfo with required fields. 

Support the multi-version Flink in the Gateway VS Support the multi-version in the external Service

Currently many big data tools, e.g. Zeppelin, Livy[1] support the multiple version engine. They both have the similar architecture. The Flink on Zeppelin is like[2]:

Image Added

The Interpreter is a process, which means that every Flink is in the separate JVM[3].


The current Gateway is responsible for compiling the user SQL and submitting the job to the cluster, which is almost the same as the Interpreter in the graph. 

If we support the multi-version in the Gateway(GatewayService also has a version), it means all the Flink are in the same JVM. It requires the Gateway to solve the shim, classloader for different flink versions, deployments...It's better we can follow the design as other tools.


[1] https://issues.cloudera.org/browse/LIVY-106

[2] https://zeppelin.apache.org/docs/latest/interpreter/flink.html#flink-on-zeppelin-architecture

[3] https://zeppelin.apache.org/docs/0.6.2/development/writingzeppelininterpreter.html

Merge Gateway into the Flink code base VS Support Gateway in the another repo

The main reason to move the Gateway is to support the multiple Flink versions in the Gateway. However, in the discussion above we think the Gateway is bound to the specific Flink and uses the external service to manage the Gateway instances. Considering the Gateway itself is bound to the Flink, it's better we can merge the Gateway into the Flink repo.

It also brings the following benefits:

  1. Reduce the cost, e.g. CI test, releases, maintain cost;
  2. SQL Client has the ability to submit the SQL to the SQLGateway and we can reuse most of the codes;

Therefore, we prefer to merge the Gateway into the Flink repo.


Appendix

Serialize and deserialize the ResultSet 

The serialization of the ResultSet mainly takes into 3 parts:

  • Serialize the LogicalType
  • Seralize the RowData
  • Serialize the Exception

Serialize the LogicalType

Considering that not all LogicalType are serializable, our plan is much like how the LogicalTypeJsonSerializer does. Currently the LogicalType has 3 kinds:

  • Basic Type 

Data Type

JSON

CHAR
{"type": "CHAR", "nullable": true/false, "length": <LENGTH>}
VARCHAR
{"type": "VARCHAR", "nullable": true/false, "length": <LENGTH>}
STRING
{"type": "VARCHAR", "nullable": true/false, "length": <LENGTH>}
BOOLEAN
{"type": "BOOLEAN", "nullable": true/false} 
BINARY
{"type": "BINARY", "nullable": true/false, "length": <LENGTH>}
VARBINARY
{"type": "VARBINARY", "nullable": true/false, "length": <LENGTH>}
BYTES
{"type": "VARBINARY", "nullable": true/false, "length": <LENGTH>}
DECIMAL
{"type": "DECIMAL", "nullable": true/false, "precision": <LENGTH>, "scale": <SCALE>}
TINYINT
{"type": "TINYINT", "nullable": true/false}
SMALLINT
{"type": "SMALLINT", "nullable": true/false}
INTEGER
{"type": "INTEGER", "nullable": true/false}
BIGINT
{"type": "BIGINT", "nullable": true/false}
FLOAT
{"type": "FLOAT", "nullable": true/false}
DOUBLE
{"type": "DOUBLE", "nullable": true/false}
DATE
{"type": "DATE", "nullable": true/false}
TIME
{"type": "TIME", "nullable": true/false, "precision": <PRECISION>}
TIMESTAMP
{"type": "TIMESTAMP", "nullable": true/false, "precision": <PRECISION>}
TIMESTAMP_LTZ
{"type": "TIMESTAMP_LTZ", "nullable": true/false, "precision": <PRECISION>}
RAW
{"type": "RAW", "nullable": true/false, "class": <CLASS>, "specialSerializer": <SERIALIZER>} 

or

{"type": "RAW", "nullable": true/false, "class": <CLASS>, "externalDataType": ...}


  • Collecction Type 

For the collection type, it recursively serialize the element type.

Type Name

JSON

MAP
{"type": "MAP", "nullable": true/false, "keyType": ..., "valueType": ...}
ARRAY
{"type": "ARRAY", "nullable": true/false, "elementType": ...}
MULTISET
{"type": "MULTISET", "nullable": true/false, "elementType": ...}
ROW
{
  "type": "ROW", 
  "nullable": true/false, 
  "fields": [
    {
      "name": <FILED_NAME>,
      "fieldType": ...
    },...
  ]
}
  • User-defined type

Don't support serializing the user defined type. Because the user can't create a new type with SQL.

Serialize the RowData

The RowData contains the two parts: RowKind and Fields.

Therefore, we propose to serialize the RowData as follows.

Code Block
languagetext
{
  "kind": "INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE",
  "fields": [<COLUMN_VALUE>, ...]
}

The `<COLUMN_VALUE>` is as same as using the `JSON_STRING` to serialize the value.

The type mapping from Flink type to JSON type is as follows.

Flink TypeJSON Type
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYstring with encoding: base64
DECIMALnumber
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTnumber
FLOATnumber
DOUBLEnumber
DATEstring with format: date
TIMEstring with format: time
TIMESTAMPstring with format: date-time
TIMESTAMP_WITH_LOCAL_TIME_ZONEstring with format: date-time (with UTC time zone)
INTERVALnumber
ARRAYarray
MAP / MULTISETobject
ROWobject
RAWstring with encoding: base64 

Serialize the Exception

Considering some clients may also care about the root cause, the serialized json object should contain the root cause and the stack. 

Code Block
languagetext
"exception": {
  "root_cause": "...", 
  "exception_stack": "..."
}

Example

Code Block
languagetext
# example with
{
"result_type": "PAYLOAD",
"results": {
		"columns": [
			{
				"name": "id",
				"type": {"type":  "BIGINT", "nullable": false}
			},
   			{
				"name": "name",
				"type": {"type":  "VARCHAR", "nullable": true, "length": 300}
			},
		 	{
				"name": "birthday",
				"type": {"type":  "TIMESTAMP", "nullable": true, "precision": 3}
			}
		],

		"data": [
			[101, "Jay", "1990-01-12T12:00.12"], # a row data
			]

	},


"next_result_uri": /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} # if not empty, uses this uri to fetch next part of result, else there is no more result.

"exception": {

      "root_cause": "....",

     "exception_stack": "..." 

}

}



The design of the origin Gateway is in the 

...