Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-15472

...

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

/v1/sessions/:session_handle

Verb: GET

Response code: 200 OK

Get the session config with the specified session handle.

Request body

{}

Response body

{

"properties": {

  "key": "value"

}

}

...


Configure Session

/v1/sessions/:session_handle/initializeconfigure_session

Verb: POST

Response code: 200 OK

Configures the session with the statement which could be SET/RESET/CREATE/ DROP/LOAD/UNLOAD/USE/ALTER/ADD JAR/REMOVE JAR. It can be used to initialize the Session:

  • register Catalog 
  • register user defined function
  • create predefined table

Note: The statement must be a single command, otherwise the server will throw an exception.


Request body

{

"statement": "", # required

"execution_timeout": "" # execution time limit in milliseconds, optional

}

Response body

{}

...

/v1/info

Verb: GET

Response code: 200 OK

Get meta data for this cluster

Request body

{}

Response body

{

"product_name": "Apache Flink",

"version": "1.16" # Flink version

}

Get Version

/api_versions

Verb: GET

Response code: 200 OK

Get the current avaliable versions for the Rest Endpoint. The client can choose one of the return version as the protocol for later communicate.

Request body

{}

Response body

{

"versions": ["v1", "v2"] # The rest endpoint support version.

}

Options

Please using the following options to configure the REST endpint.

...

Code Block
languagejava
interface SqlGatewayService {
        
    // -------------------------------------------------------------------------------------------
    // Session Management
    // -------------------------------------------------------------------------------------------

    SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
    
    void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
    
    Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException;
    
    // -------------------------------------------------------------------------------------------
    // Operation Management
    // -------------------------------------------------------------------------------------------
        
    /**
     * Get operation info to describe the Operation.
     */
    OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    /** Get the result schema for the specified Operation. */
    ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle opreationHandle) throws SqlGatewayException;
    
    void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    // -------------------------------------------------------------------------------------------
    // Statements 
    // -------------------------------------------------------------------------------------------
    
    /** 
     * Using the statement to initialize the Session. It's only allowed to 
     * execute SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR/REMOVE JAR.
     */
    void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException;
    
    /** Execute the statement with the specified Session. It allows to execute with Operation-level configuration.*/
    OperationHandle executeStatement(
        SessionHandle sessionHandle, 
        String statement, 
        long executionTimeoutMs, 
        Configuration executionConfig) throws SqlGatewayException;
   
	/**
	 * Fetch the results with token id. 
     */
    ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, int token, int maxRows) throws SqlGatewayException;

  	/**
     * Fetch the Operation-level log from the GatewayService. For some endpoint, it allows to fetch the log at the operation level.
     */
    ResultSet fetchLog(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException;
     
   /**
    * Only supports to fetch results in FORWARD/BACKWARD orientation.
    * - Users can only BACKWARD from the current offset once.
    * - The Gateway doesn't materialize the changelog.
    */
    ResultSet fetchResult(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException;          

	/**
     * For the same functionality, every endpoint has its result schema. Therefore, 
     * the endpoint submit the callable executor to the OperationManager that manages
     * lifecycle of the Operaiton. The callable executor organizes the results
     * as the Endpoint requires.
     */
    OperationHandle submitOperation(OperationType type, Callable<ResultSet> executor, ResolvedSchema resultSchema) throws SqlGatewayException;
    
    // -------------------------------------------------------------------------------------------
    // Utils 
    // -------------------------------------------------------------------------------------------
    
    /**
     * Describe the cluster info.
     */
    Map<String, String> getGatewayInfo();
    
    void heartbeat(SessionHandle sessionHandle) throws SqlGatewayException;
    
    /**
     * Endpoint is status-less. All the session configs are memorized in the GatewayService side.
     */     
	EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) throws SqlGatewayException;
    
    /** Returns a list of completion hints for the given statement at the given position. */
    List<String> completeStatement(String sessionId, String statement, int position) throws SqlGatewayException;

   
    // -------------------------------------------------------------------------------------------
    // Catalog API 
    // -------------------------------------------------------------------------------------------
    
    String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException;

    String getCurrentDatabase(SessionHandle sessionHandle) throws SqlGatewayException;
    
    List<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException;
    
    List<String> listDatabases(SessionHandle sessionHandle, String catalogName) throws SqlGatewayException;   
    
    List<TableInfo> listTables(SessionHandle sessionHandle, String catalogName, String databaseName, TableKind tableKind) throws SqlGatewayException;
    
    List<FunctionInfo> listFunctions(SessionHandle sessionHandle, String catalogName, String databaseName, FunctionScope scope) throws SqlGatewayException;
    
    ContextResolvedTable getTable(SessionHandle sessionHandle, ObjectIdentifier tableIdentifier) throws SqlGatewayException;
    
    ContextResolvedFunction getFunction(SessionHandle sessionHandle, ObjectIdentifier functionIdentifier) throws SqlGatewayException;
    
}

class TableInfo {
    boolean isTemporary;
    ObjectIdentifier identifier;
    TableKind tableKind;
}

class UserDefinedFunctionInfoFunctionInfo {
    boolean isTemporary;
    ObjectIdentifier identifier;
    FunctionKind kind;
}

class SessionEnvironment {
    private String sessionName;
    private EndpointVersion sessionEndpointVersion;
    private List<URL> libs;
    private List<URL> jars;
    private Map<String, String> sessionConfig
}

public class OperationInfo {
    OperationStatus status;
    OperationType type;
    boolean hasResult;
}

public class ResultSet {
    
    int nextToken;
    
    ResultType resultType; 

    ResolvedSchema resultSchema;
    
    List<RowData> results;
    
    Exception exception;
}

public enum ResultType {
    PAYLOAD,
    EMPTY,
    EOS,
    ERROR
}

...

Option

Required

Default value

Description

sql-gateway.session.idle-timeout


No

5 min

Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero or negative value.


sql-gateway.session.check-interval


No

1 min

The check interval for session timeout, which can be disabled by setting to zero or negative value.


sql-gateway.session.max-num


No

1000000


The number of the active sessions.

sql-gateway.worker.threads.max


No

500

Maximum number of worker threads for the gateway workers.


sql-gateway.worker.threads.min


No

5

Minimum number of worker threads for the gateway workers.

sql-gateway.worker.keepalive-time


No

5 min

Keepalive time for an idle worker thread. When the number of workers exceeds the min workers, excessive threads are killed after this time interval.


SQL

...

Gateway Script

Considering many users prefer to use the SqlGateway only, we propose to add a script named the `sql-server.sh` in the bin directory. Users can use the command

Code Block
languagebash
./sql-gateway.sh (start|stop|stop-all) [args]

to manipulate the sql gateway.

CommandParmeterDescription
start-Dkey=value

Start the gateway and write the pid of the startted sql gateway into the pid file. 


Users can specify the -Dkey=value to specify the parameters. For example, users can specify `-Dsql-gateway.endpoint.rest.address=127.0.0.1` 

stop(none)Stop the last in the pid file.
stop-all(none)Stop all the server in the running pid file.


Then users can start the sql client to communicate with the SQL Gateway in the local or remote environment.

SQL Client API

When lanuch the SQL Client in the gateway mode, user should specify the address and port of the Gateway they want to commuincate. Therefore, we should expose the

...

When lanuch the SQL Client in the gateway mode, user should specify the address and port of the Gateway they want to commuincate. Therefore, we should expose the following startup options.

...

Code Block
languagetext
{
	"result_type": "PAYLOAD",
	"results": {
		"columns": [
			{
				"name": "id",
				"type": {"type":  "BIGINT", "nullable": false}
			},
   			{
				"name": "name",
				"type": {"type":  "VARCHAR", "nullable": true, "length": 300}
			},
		 	{
				"name": "birthday",
				"type": {"type":  "TIMESTAMP", "nullable": true, "precision": 3}
			}
		],
		"data": [
			{
				"kind": "INSERT",
				"fields": [101, "Jay", "1990-01-12T12:00.12"], # a row data
			},
			{
				"kind": "DELETE",
				"fields": [102, "Jimmy", null]
			}
		]
	},
	  "next_result_uri": /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} # if not empty, uses this uri to fetch next part of result, else there is no more result.
	"exception": {
    	"root_cause": "....",
    	"exception_stack": "..." 
	}
}

...