...
Using the API in the section, users can register a Session
in the Gateway, which maintains the user-level configuration and resources.
OpenSession
/v1/sessions | |
Verb: POST | Response code: 200 OK |
Create a new session with the specific configuraion. In the release-1.16, we only supports to load the jar in the local file system. But we can extend this feature to load the jar in the remote filesystem, e.g. HDFS. | |
Request body | { "session_name": "", # optional "libs": [], # optional. "jars": [], # optional "properties": { # optional, properties for current session "key": "value" } } |
Response body | { "session_handle": "", # if session is created successfully } |
CloseSession
/v1/sessions/:session_handle | |
Verb: DELETE | Response code: 200 OK |
Close a session, release related resources including operations and properties | |
Request body | {} |
Response body | { "status": "CLOSED" # if cancel successfully } |
Get Session Config
/v1/sessions/:session_handle | |
Verb: GET | Response code: 200 OK |
Get the session config with the specified session handle. | |
Request body | {} |
Response body | { "properties": { "key": "value" } } |
Initialize Session
/v1/sessions/:session_handle/initialize_session | |
Verb: POST | Response code: 200 OK |
Configures the session with the statement which could be SET/RESET/CREATE/ DROP/LOAD/UNLOAD/USE/ALTER/ADD JAR/REMOVE JAR. It can be used to initialize the Session:
Note: The statement must be a single command, otherwise the server will throw an exception. | |
Request body | { "statement": "", # required "execution_timeout": "" # execution time limit in milliseconds, optional } |
Response body | {} |
Trigger Session Heartbeat
/version/sessions/:session_handle/heartbeat | |
Verb: POST | Response code: 200 OK |
Trigger heartbeat to tell the server that the client is active, and to keep the session alive as long as configured timeout value. If a session does not receive a heartbeat or any other operations, the session will be destroyed when the timeout is reached. | |
Request body | {} |
Response body | {} |
Operation-related
Because the Gateway allows async execution mode, the API in the section can manipulate the runnning jobs.
Get Operation Status
/v1/sessions/:session_handle/operations/:operation_handle/status | |
Verb: GET | Response code: 200 OK |
Get the status of a running job. If the session is expired, the server will throw "session not found" exception. If the job is finished, the server will throw "job not found" exception. | |
Request body | {} |
Response body | { "status": "" # refer to OperationStatus } |
Cancel Operation
/v1/sessions/:session_handle/operations/:operation_handle/cancel | |
Verb: PUT | Response code: 200 OK |
Cancel the running operation and update the opeartion status. | |
Request body | {} |
Response body | { "status": "CANCELED" # if cancel successfully } |
Close Operation
/v1/sessions/:session_handle/operations/:operation_handle | |
Verb: DELETE | Response code: 200 OK |
Remove the specified If the user invokes closeOperation twice, the later invocation will get exception. | |
Request body | {} |
Response body | { "status": "CLOSED" # if close successfully } |
Statement-related
The API in the section is used to submit the statement to the Gateway and get the results.
Execute a statement
/v1/sessions/:session_handle/statements | |
Verb: POST | Response code: 200 OK |
Execute a statement which could be all Flink supports SQL statement. The SET xx=yy statement will override/update the TableConfig held by current session, and the RESET statement will reset all properties set by SET xx=yy statement. The USE MODULE/CATALOG/DATABASE xx statement will update the default module/catalog/database in TableEnvironment held by current session. The statement must be a single command, otherwise the server will throw an exception. For For | |
Request body | { "statement": "", # required "execution_timeout": "" # execution time limit in milliseconds, optional } |
Response body | { "operation_handle": "", "operation_type": "EXECUTE_STATEMNT", "has_result": true/false # determine whether needs to fetch results later } |
Fetch results
/v1/sessions/:session_handle/operations/:operation_handle/result/:token | |
Verb: GET | Response code: 200 OK |
Fetch a part of result for a flink job execution. If the result data is too large or the result is streaming, we can use this API to get a part of the result at a time. The initialized value of token is 0. The token in the next request must be the same as the token in the current request or must be equal to token (in the current request) + 1, otherwise the client will get an exception from the server. If multiple requests are executed with the same token, the result is the same. This design makes sure the client could get the result even if some errors occur in client. The client can get the next part of result using /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} (which is the value of next_result_uri in the response data). If next_result_uri is empty, no more data is remaining in the server. The server could drop the old data before current token. (The client successfully obtains those data) We will introduce fetch_size or max_wait_time (to reach the fetch_size) for optimization in future. The returned result has the same schema as the TableResult#getResolvedSchema. Please refer to the Appendix about the transormation between the ResultSet and JSON. | |
Request body | {} |
Response body | { "result_type": "PAYLOAD", "results": [ # currently, there is only one result now. If multiple queries is executed in a single job, there are many results. { "columns": [ # if the execution is successful { "name": "", "type": {"type": "BOOLEAN", "nullable": true}# string value of LogicalType }, ], "data": [ ["value", ], # a row data ] }, ], "next_result_uri": /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} # if not empty, uses this uri to fetch next part of result, else there is no more result. "exception": { "root_cause": "....", "exception_stack": "..." } } |
Statement Completement
/v1/sessions/:session_handle/complete_statement | |
Verb: GET | Response code: 200 OK |
Complete the statements. For example, users input SELE in the terminal and press the tab, the terminal will use the API to complete the statement and return the SELECT. | |
Request body | { "statement": "", "postion": } |
Response body | { "candidates": [] } |
Util
Get Info
/v1/info | |
Verb: GET | Response code: 200 OK |
Get meta data for this cluster | |
Request body | {} |
Response body | { "product_name": "Apache Flink", "version": "1.16" # Flink version } |
Options
Please using the following options to configure the REST endpint.
Option name | Default Value(Required) | Description |
endpoint.type | rest (Yes) | REST endpoint should use 'rest'. |
endpoint.rest.port | 8083(No) | REST endpoint port. |
endpoint.rest.address | 127.0.0.1 (No) | The address that the SqlServer binds itself. |
Pluggable Endpoint Discovery
...
When lanuch the SQL Client in the gateway mode, user should specify the address and port of the Gateway they want to commuincate. Therefore, we should expose the following startup options.
Parameter | Requried | Description |
-h, --host | Yes | The gateway address |
-p, --port | Yes | The gatewary port |
User can start up the SQL Client with the command in the bash.
...
Considering that not all LogicalType are serializable, our plan is much like how the LogicalTypeJsonSerializer
does. Currently the LogicalType has 3 kinds:
- Basic Type
Type Name | JSON |
---|---|
CHAR | {"type": "CHAR", "nullable": true/false, "length": <LENGTH>} |
VARCHAR | {"type": "VARCHAR", "nullable": true/false, "length": <LENGTH>} |
STRING | {"type": "VARCHAR", "nullable": true/false, "length": <LENGTH>} |
BOOLEAN | {"type": "BOOLEAN", "nullable": true/false} |
BINARY | {"type": "BINARY", "nullable": true/false, "length": <LENGTH>} |
VARBINARY | {"type": "VARBINARY", "nullable": true/false, "length": <LENGTH>} |
BYTES | {"type": "VARBINARY", "nullable": true/false, "length": <LENGTH>} |
DECIMAL | {"type": "DECIMAL", "nullable": true/false, "precision": <LENGTH>, "scale": <SCALE>} |
TINYINT | {"type": "TINYINT", "nullable": true/false} |
SMALLINT | {"type": "SMALLINT", "nullable": true/false} |
INTEGER | {"type": "INTEGER", "nullable": true/false} |
BIGINT | {"type": "BIGINT", "nullable": true/false} |
FLOAT | {"type": "FLOAT", "nullable": true/false} |
DOUBLE | {"type": "DOUBLE", "nullable": true/false} |
DATE | {"type": "DATE", "nullable": true/false} |
TIME | {"type": "TIME", "nullable": true/false, "precision": <PRECISION>} |
TIMESTAMP | {"type": "TIMESTAMP", "nullable": true/false, "precision": <PRECISION>} |
TIMESTAMP_LTZ | {"type": "TIMESTAMP_LTZ", "nullable": true/false, "precision": <PRECISION>} |
RAW | {"type": "RAW", "nullable": true/false, "class": <CLASS>, "specialSerializer": <SERIALIZER>} |
- Collecction Type
For the collection type, it recursively serialize the element type.
Type Name | JSON |
---|---|
MAP | {"type": "MAP", "nullable": true/false, "keyType": ..., "valueType": ...} |
ARRAY | {"type": "ARRAY", "nullable": true/false, "elementType": ...} |
MULTISET | {"type": "MULTISET", "nullable": true/false, "elementType": ...} |
ROW | { "type": "ROW", "nullable": true/false, "fields": [ { "name": <FILED_NAME>, "fieldType": ... },... ] |
- User-defined type
Don't support serializing the user defined type. Because the user can't create a new type with SQL.
...
The type mapping from Flink type to JSON type is as follows.
Flink Type | JSON Type |
---|---|
CHAR / VARCHAR / STRING | string |
BOOLEAN | boolean |
BINARY / VARBINARY | string with encoding: base64 |
DECIMAL | number |
TINYINT | number |
SMALLINT | number |
INT | number |
BIGINT | number |
FLOAT | number |
DOUBLE | number |
DATE | string with format: date |
TIME | string with format: time |
TIMESTAMP | string with format: date-time |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | string with format: date-time (with UTC time zone) |
INTERVAL | number |
ARRAY | array |
MAP / MULTISET | object |
ROW | object |
RAW | string with encoding: base64 |
Serialize the Exception
Considering some clients may also care about the root cause, the serialized json object should contain the root cause and the stack.
...
Code Block | ||
---|---|---|
| ||
{ "result_type": "PAYLOAD", "results": { "columns": [ { "name": "id", "type": {"type": "BIGINT", "nullable": false} }, { "name": "name", "type": {"type": "VARCHAR", "nullable": true, "length": 300} }, { "name": "birthday", "type": {"type": "TIMESTAMP", "nullable": true, "precision": 3} } ], "data": [ { "kind": "INSERT", "fields": [101, "Jay", "1990-01-12T12:00.12"], # a row data }, { "kind": "DELETE", "fields": [102, "Jimmy", null] } ] }, "next_result_uri": /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} # if not empty, uses this uri to fetch next part of result, else there is no more result. "exception": { "root_cause": "....", "exception_stack": "..." } } |
...