Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Using the API in the section, users can register a Session in the Gateway, which maintains the user-level configuration and resources. 

OpenSession

/v1/sessions

Verb: POST

Response code: 200 OK

Create a new session with the specific configuraion. In the release-1.16, we only supports to load the jar in the local file system. But we can extend this feature to load the jar in the remote filesystem, e.g. HDFS.

Request body

{

"session_name": "", # optional

"libs": [], # optional. 

"jars": [], # optional

"properties": { # optional, properties for current session

  "key": "value"

}

}

Response body

{

"session_handle": "", # if session is created successfully

}

CloseSession

/v1/sessions/:session_handle

Verb: DELETE

Response code: 200 OK

Close a session, release related resources including operations and properties

Request body

{}

Response body

{

"status": "CLOSED" # if cancel successfully

}

Get Session Config

/v1/sessions/:session_handle

Verb: GET

Response code: 200 OK

Get the session config with the specified session handle.

Request body

{}

Response body

{

"properties": {

  "key": "value"

}

}


Initialize Session

/v1/sessions/:session_handle/initialize_session

Verb: POST

Response code: 200 OK

Configures the session with the statement which could be SET/RESET/CREATE/ DROP/LOAD/UNLOAD/USE/ALTER/ADD JAR/REMOVE JAR. It can be used to initialize the Session:

  • register Catalog 
  • register user defined function
  • create predefined table

Note: The statement must be a single command, otherwise the server will throw an exception.


Request body

{

"statement": "", # required

"execution_timeout": "" # execution time limit in milliseconds, optional

}

Response body

{}


Trigger Session Heartbeat

/version/sessions/:session_handle/heartbeat

Verb: POST

Response code: 200 OK

Trigger heartbeat to tell the server that the client is active, and to keep the session alive as long as configured timeout value.

If a session does not receive a heartbeat or any other operations, the session will be destroyed when the timeout is reached.

Request body

{}

Response body

{}


Because the Gateway allows async execution mode, the API in the section can manipulate the runnning jobs.

Get Operation Status

/v1/sessions/:session_handle/operations/:operation_handle/status

Verb: GET

Response code: 200 OK

Get the status of a running job.

If the session is expired, the server will throw "session not found" exception.

If the job is finished, the server will throw "job not found" exception.

Request body

{}

Response body

{

"status": "" # refer to OperationStatus

}


Cancel Operation

/v1/sessions/:session_handle/operations/:operation_handle/cancel

Verb: PUT

Response code: 200 OK

Cancel the running operation and update the opeartion status.

Request body

{}

Response body

{

"status": "CANCELED" # if cancel successfully

}


Close Operation

/v1/sessions/:session_handle/operations/:operation_handle

Verb: DELETE

Response code: 200 OK

Remove the specified Operation.

If the user invokes closeOperation twice, the later invocation will get exception.

Request body

{}

Response body

{

"status": "CLOSED" # if close successfully

}


The API in the section is used to submit the statement to the Gateway and get the results.

Execute a statement

/v1/sessions/:session_handle/statements

Verb: POST

Response code: 200 OK

Execute a statement which could be all Flink supports SQL statement.


The SET xx=yy statement will override/update the TableConfig held by current session, and the RESET statement will reset all properties set by SET xx=yy statement.


The USE MODULE/CATALOG/DATABASE xx statement will update the default module/catalog/database in TableEnvironment held by current session.


The statement must be a single command, otherwise the server will throw an exception.


For BEGIN STATEMENT SET, it will open a buffer in the Session and allows the users to submit the insert statement into the Session later. When the Session receives the END statement, the Gateway will submit the buffered statements.


For ADD JAR/REMOVE JAR, if the jar is in the local environment, we will just add it into the class path or remove it from the class path. If the jar is the remote jar, we will create a session level directory and download the jar into the directory. When the session closes, it should also clean up all the resources in the session-level directory.


Request body

{

"statement": "", # required

"execution_timeout": "" # execution time limit in milliseconds, optional

}

Response body

{

"operation_handle": "",

"operation_type": "EXECUTE_STATEMNT",

"has_result": true/false # determine whether needs to fetch results later

}


Fetch results

/v1/sessions/:session_handle/operations/:operation_handle/result/:token

Verb: GET

Response code: 200 OK

Fetch a part of result for a flink job execution. If the result data is too large or the result is streaming, we can use this API to get a part of the result at a time. The initialized value of token is 0. The token in the next request must be the same as the token in the current request or must be equal to token (in the current request) + 1, otherwise the client will get an exception from the server. If multiple requests are executed with the same token, the result is the same. This design makes sure the client could get the result even if some errors occur in client. The client can get the next part of result using /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} (which is the value of next_result_uri in the response data). If next_result_uri is empty, no more data is remaining in the server.

The server could drop the old data before current token. (The client successfully obtains those data)

We will introduce fetch_size or max_wait_time (to reach the fetch_size) for optimization in future.


The returned result has the same schema as the TableResult#getResolvedSchema.

Please refer to the Appendix about the transormation between the ResultSet and JSON.

Request body

{}

Response body

{

"result_type": "PAYLOAD",

"results": [ # currently, there is only one result now. If multiple queries is executed in a single job, there are many results.

{

"columns": [ # if the execution is successful

{

"name": "",

"type": {"type":  "BOOLEAN", "nullable": true}# string value of LogicalType

},

],

"data": [

["value", ], # a row data

]

},

],

"next_result_uri": /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} # if not empty, uses this uri to fetch next part of result, else there is no more result.

"exception": {

      "root_cause": "....",

     "exception_stack": "..." 

}

}



Statement Completement

/v1/sessions/:session_handle/complete_statement

Verb: GET

Response code: 200 OK

Complete the statements. For example, users input SELE in the terminal and press the tab, the terminal will use the API to complete the statement and return the SELECT.

Request body

{

"statement": "",

"postion":

}

Response body

{

"candidates": []

}


Util

Get Info

/v1/info

Verb: GET

Response code: 200 OK

Get meta data for this cluster

Request body

{}

Response body

{

"product_name": "Apache Flink",

"version": "1.16" # Flink version

}

Options

Please using the following options to configure the REST endpint.

Option name

Default Value(Required)

Description

endpoint.type

rest (Yes)

REST endpoint should use 'rest'.

endpoint.rest.port

8083(No)

REST endpoint port.

endpoint.rest.address

127.0.0.1 (No)

The address that the SqlServer binds itself.


Pluggable Endpoint Discovery

...

When lanuch the SQL Client in the gateway mode, user should specify the address and port of the Gateway they want to commuincate. Therefore, we should expose the following startup options.


Parameter

Requried

Description

-h, --host

Yes

The gateway address

-p, --port

Yes

The gatewary port


User can start up the SQL Client with the command in the bash.

...

Considering that not all LogicalType are serializable, our plan is much like how the LogicalTypeJsonSerializer does. Currently the LogicalType has 3 kinds:

  • Basic Type 

Type Name

JSON

CHAR
{"type": "CHAR", "nullable": true/false, "length": <LENGTH>}
VARCHAR
{"type": "VARCHAR", "nullable": true/false, "length": <LENGTH>}
STRING
{"type": "VARCHAR", "nullable": true/false, "length": <LENGTH>}
BOOLEAN
{"type": "BOOLEAN", "nullable": true/false} 
BINARY
{"type": "BINARY", "nullable": true/false, "length": <LENGTH>}
VARBINARY
{"type": "VARBINARY", "nullable": true/false, "length": <LENGTH>}
BYTES
{"type": "VARBINARY", "nullable": true/false, "length": <LENGTH>}
DECIMAL
{"type": "DECIMAL", "nullable": true/false, "precision": <LENGTH>, "scale": <SCALE>}
TINYINT
{"type": "TINYINT", "nullable": true/false}
SMALLINT
{"type": "SMALLINT", "nullable": true/false}
INTEGER
{"type": "INTEGER", "nullable": true/false}
BIGINT
{"type": "BIGINT", "nullable": true/false}
FLOAT
{"type": "FLOAT", "nullable": true/false}
DOUBLE
{"type": "DOUBLE", "nullable": true/false}
DATE
{"type": "DATE", "nullable": true/false}
TIME
{"type": "TIME", "nullable": true/false, "precision": <PRECISION>}
TIMESTAMP
{"type": "TIMESTAMP", "nullable": true/false, "precision": <PRECISION>}
TIMESTAMP_LTZ
{"type": "TIMESTAMP_LTZ", "nullable": true/false, "precision": <PRECISION>}
RAW
{"type": "RAW", "nullable": true/false, "class": <CLASS>, "specialSerializer": <SERIALIZER>} 

or

{"type": "RAW", "nullable": true/false, "class": <CLASS>, "externalDataType": ...}


  • Collecction Type 

For the collection type, it recursively serialize the element type.

Type Name

JSON

MAP
{"type": "MAP", "nullable": true/false, "keyType": ..., "valueType": ...}
ARRAY
{"type": "ARRAY", "nullable": true/false, "elementType": ...}
MULTISET
{"type": "MULTISET", "nullable": true/false, "elementType": ...}
ROW
{
  "type": "ROW", 
  "nullable": true/false, 
  "fields": [
    {
      "name": <FILED_NAME>,
      "fieldType": ...
    },...
  ]
}
  • User-defined type

Don't support serializing the user defined type. Because the user can't create a new type with SQL.

...

The type mapping from Flink type to JSON type is as follows.

Flink TypeJSON Type
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYstring with encoding: base64
DECIMALnumber
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTnumber
FLOATnumber
DOUBLEnumber
DATEstring with format: date
TIMEstring with format: time
TIMESTAMPstring with format: date-time
TIMESTAMP_WITH_LOCAL_TIME_ZONEstring with format: date-time (with UTC time zone)
INTERVALnumber
ARRAYarray
MAP / MULTISETobject
ROWobject
RAWstring with encoding: base64 

Serialize the Exception

Considering some clients may also care about the root cause, the serialized json object should contain the root cause and the stack. 

...

Code Block
languagetext
{
	"result_type": "PAYLOAD",
	"results": {
		"columns": [
			{
				"name": "id",
				"type": {"type":  "BIGINT", "nullable": false}
			},
   			{
				"name": "name",
				"type": {"type":  "VARCHAR", "nullable": true, "length": 300}
			},
		 	{
				"name": "birthday",
				"type": {"type":  "TIMESTAMP", "nullable": true, "precision": 3}
			}
		],
		"data": [
			{
				"kind": "INSERT",
				"fields": [101, "Jay", "1990-01-12T12:00.12"], # a row data
			},
			{
				"kind": "DELETE",
				"fields": [102, "Jimmy", null]
			}
		]
	},
	"next_result_uri": /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} # if not empty, uses this uri to fetch next part of result, else there is no more result.
	"exception": {
    	"root_cause": "....",
    	"exception_stack": "..." 
	}
}

...