Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/zpx64l0z91b0sz0scv77h0g13ptj4xxo

Vote thread: https://lists.apache.org/thread/bk4c80s4ptrpg2n1bfpqg2779xbrl5qm

JIRA: https://issues.apache.org/jira/browse/FLINK-29941

Released: 1.17


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-24 introduced the SQL Client and gateway mode client as a further feature. Then FLIP-91 introduced SQL Gateway and REST Endpoint, making it possible for SQL Client to connect to a remote gateway through REST API. Since connecting to a remote cluster and submitting sql jobs is a common need, we proposed to implement the gateway mode for SQL Client.

Because the SqlGateway is the common basis of the Client in our design, we prefer to call the two modes "local mode" and "remote mode" in the following.

Architecture

Currently, the SQL client can only start in local mode. In our plan, the SQL Client will be able to start in remote mode. The current architecture of local mode is as follows:

We proposed to implement a unified architecture in which the client connects to REST endpoint of the gateway in both local mode and remote mode. The new architecture will be as follows. With the new architecture we can keep the code base clean and make it more easy to be maintained. In the local mode, we proposed to pick the first available port in the range of [50,000, 60,000] as the REST endpoint's port.


For the convenience of explaining more details of the architecture, let's see the execution flow of user's input:

  1. The CliClient parses the input to determine the kind of it (either client command or sql statement).

  2. If input is a client command (e.g. help, exit), the CliClient will handle it and print information in terminal; Else the Executor of client will submit it to the gateway by sending an ExecuteStatementRequest. The gateway will return an ExecuteStatementResponse containing an OperationHandle which can be used to fetch the execution results.

  3. The Executor sends an FetchResultRequest to the gateway, then the gateway will return results wrapped in FetchResultResponseBody or throw exception if error occurs in execution wrapped in ErrorResponseBody. Both will be serialized into JSON format.

  4. The Executor collects the results from FetchResultResponseBody, deserializes it into original RowData and wrapped it into ClientResult;  Or handles the error message from the ErrorResponseBody.

  5. The CliClient prints the execution information:

    1. If error, print the exception messages;
    2. Else if the result is for a query, print the result in TABLE/TABLEAU/CHANGELOG style according to the configuration.
    3. Else, print the result in TABLEAU style.

The picture below shows how the modules of the client perform the execution. 


Proposed Changes

Currently, the client can get everything needed from TableEnvironment directly. When migrating to the SQL Gateway, the information should be returned appropriately. Thus, some changes should be made to the gateway and runtime modules. The proposed changes include:

  1. ResultSet modification. We propose to change it to an interface, so that we can hide the implementation of data serialization and deserialization. And we should add more information to the ResultSet
    1. Whether the result is for a query. Because the streaming queries may produce continuous data, and the client needs to store it. 
    2. The Job ID. Users can use the Job ID to manage the submitted jobs (e.g. cancel, close).
    3. The ResultKind defined in table api. The client needs to know the types of the result.
  2. REST Endpoint modification.  The client needs to print the results in a SQL-compliant format, but currently the endpoint can only serialize the ResultSet to JSON format. So we propose to add a REST query parameter 'RowFormat' to tell the endpoint how to serialize the ResultSet. We use RowFormat.JSON to keep the current serialization behavior and RowFormat.PLAIN_TEXT to get SQL-compliant strings.
  3. Add root cause to the ErrorResponseBody. Currently, the ErrorResponseBody only contains the exception stack, which is not convenient for the client to handle for two reasons:

    1. There is an option sql-client.verbose. If it is set to false, the client should print the root cause of the exception.
    2. The client needs to know if the error is caused by some special exception. Here I'd like to give a specific scenario to explain why it's important. 
      Flink SQL has a 'Statement Set' syntax, which is like :

      Statement Set Syntax Example
      STATEMENT SET BEGIN
      	INSERT INTO ...;
      	INSERT INTO ...;
      END;

      Because the client cuts the input by ';', when the user input a statement set line by line, the client will read the first two lines and submit to the execution environment. Then the sql parser will throw a SqlParserEofException to indicates the statement is incomplete. Finally, when the client find this exception, it will continue to read from user's input instead of forwarding the exception message to the terminal. To get the explicit exception type from the gateway, we should take the important exception as root cause.

Public Interfaces

ResultSet interface

We propose to change the ResultSet from class to interface.

interface ResultSet
package org.apache.flink.table.gateway.api.results;

/** The collection of the results. */
@PublicEvolving
public interface ResultSet {

	//-----------------------------------------------------------------------------------------------
	// Old APIs
 	//-----------------------------------------------------------------------------------------------
 
	/** Get the type of the results, which may indicate the result is EOS or has data. */
    ResultType getResultType();

    /**
     * The token indicates the next batch of the data.
     *
     * <p>When the token is null, it means all the data has been fetched.
     */
    @Nullable
    Long getNextToken();

    /**
     * The schema of the data.
     *
     * <p>The schema of the DDL, USE, EXPLAIN, SHOW and DESCRIBE align with the schema of the {@link
     * TableResult#getResolvedSchema()}. The only differences is the schema of the `INSERT`
     * statement.
     *
     * <p>The schema of INSERT:
     *
     * <pre>
     * +-------------+-------------+----------+
     * | column name | column type | comments |
     * +-------------+-------------+----------+
     * |   job id    |    string   |          |
     * +- -----------+-------------+----------+
     * </pre>
     */
    ResolvedSchema getResultSchema();

    /** All the data in the current results. */
    List<RowData> getData();

    /** Describe the kind of the result. */
    @PublicEvolving
    enum ResultType {
        /** Indicate the result is not ready. */
        NOT_READY,

        /** Indicate the result has data. */
        PAYLOAD,

        /** Indicate all results have been fetched. */
        EOS
    }
    
 	//-----------------------------------------------------------------------------------------------
	// New APIs
 	//-----------------------------------------------------------------------------------------------

    /** Indicates that whether the result is for a query. */
    boolean isQueryResult();
	
	/**
     * If the statement was submitted to a client, returns the JobID which uniquely identifies the
     * job. Otherwise, returns null.
     */
	@Nullable JobID getJobID();
	
	/** Get the result kind of the result. */
	@Nullable ResultKind getResultKind();
}


REST Endpoint Modification

  1. RowFormat definition

    RowFormat
    package org.apache.flink.table.gateway.rest.util;
    
    /** Describes the serialization format of {@link RowData} in the {@link ResultSet}. */
    @PublicEvolving
    public enum RowFormat {
        /**
         * Indicates to serialize the RowData to JSON format, which contains original LogicalType
         * information, so it can be deserialized back to RowData. This is the default row format.
         */
        JSON,
    
        /** Indicates to serialize the RowData to SQL-compliant, plain strings. */
        PLAIN_TEXT
    }
  2. FetchResultsResponseBody modification. Main changes include:

    1. Add isQueryResult, jobID and resultKind  fields.

    2. Improve the serialization and deserialization of FetchResultsResponseBody. Since we propose to introduce the RowFormat, we shouldn't use ResultSet as part of FetchResultsResponseBody. The data of result should be serialized and deserialized according to the row format.

Overview of fetching results REST API:

/v2/sessions/:session_handle/operations/:operation_handle/result/:token?rowFormat={JSON/PLAIN_TEXT}

Verb: GETResponse code: 200 OK


Path parameters


session_handle: The SessionHandle that identifies a session.

operation_handle: The OperationHandle that identifies a operation.

token: The Token that indicates where is the result.

Query parameterrowFormat: The serialization format of RowData in ResultSet.
Request body{}

Below are examples of query response bodies.

Schema:  {TIMESTAMP(3), MAP<VARCHAR, INTEGER>} ,

Test Data (1 row): {1990-10-14 12:12:43.123, {"test" = 1}}















Response body

(rowFormat = json)

{
  "resultType": "PAYLOAD",
  "isQueryResult": true,  
  "jobID": "50ddcce17e3d203fd9d8f0a70dc687b2",
  "resultKind": "SUCCESS_WITH_CONTENT",
  "results": 
    {
      "columns": [
        {
          "name": "timestamp",
          "logicalType": {
            "type": "TIMESTAMP_WITHOUT_TIME_ZONE",
            "nullable": true,
            "precision": 3
          }
        },
        {
          "name": "map",
          "logicalType": {
            "type": "MAP",
            "nullable": true,
            "keyType": {
              "type": "VARCHAR",
              "nullable": true,
              "length": 2147483647
            },
            "valueType": {
              "type": "INTEGER",
              "nullable": true
            }
          }
        }],
      "rowFormat": "JSON", 
      "data": [
        {
          "kind": "INSERT",
          "fields": [
            "1990-10-14T12:12:43.123",
            {
              "test": 1
            }]
        }]
    },
  "nextResultUri": "/v2/sessions/:session_handle/operations/:operation_handle/result/:{token+1}?rowFormat=JSON"
}





Response body

(rowFormat = plain_string)

{
  "resultType": "PAYLOAD",
  "isQueryResult": true,
  "jobID": "50ddcce17e3d203fd9d8f0a70dc687b2",
  "resultKind": "SUCCESS_WITH_CONTENT",
  "results": 
  {
    "columns": [

      {
        "name": "timestamp",
        "logicalType": {
          "type": "TIMESTAMP_WITHOUT_TIME_ZONE",
          "nullable": true,
          "precision": 3
        }
      },
      {
        "name": "map",
        "logicalType": {
          "type": "MAP",
          "nullable": true,
          "keyType": {
            "type": "VARCHAR",
            "nullable": true,
            "length": 2147483647
          },
          "valueType": {
            "type": "INTEGER",
            "nullable": true
          }
        }
      }],
    "rowFormat": "PLAIN_TEXT",
    "data": [
      {
        "kind": "INSERT",
        "fields": [
          "1990-10-14 12:12:43.123", "{test=1}"
        ]
      },
  "nextResultUri": "/v2/sessions/:session_handle/operations/:operation_handle/result/:{token+1}?rowFormat=PLAIN_TEXT"
}


NOTICE

  • If resultType == NOT_READY, we can't know the detail of the result, there only exists resultType and nextResultUri.
  • isQueryResult can be true or false
  • resultKind can be SUCCESS or SUCCESS_WITH_CONTENT
  • If the sql is not submitted as a job, the jobID is null.

Add 'rootCause' field to ErrorResponseBody

ErrorDetail
/** Generic response body for communicating errors on the server. */
public final class ErrorResponseBody implements ResponseBody {
	public final List<String> errors;

	public final String rootCause;
}

An example of the response body for parsing an incomplete statement is in the following:

ErrorResponseBody in JSON
{
	"errors" : [
					"org.apache.flink.table.gateway.service.utils.SqlExecutionException: Can't execute statement.",
					"org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.fetchResults: failed to fetch results.",
				 	"org.apache.flink.table.api.SqlParserEOFException: Encountered \"END\" at line 2, column 1.",
					"org.apache.flink.sql.parser.impl.ParseException: Encountered \"END\" at line 2, column 1.",
					"Was expecting one of:",
       				# more information here 
					], 
  
	"rootCause" : "org.apache.flink.table.api.SqlParserEOFException: Encountered \"END\" at line 2, column 1."
}

Option

We proposed a new option to control the interval of fetching result.

Option nameDefault value (Required)Description
sql-client.result.fetch-interval100ms (No)The time interval of fetching results from gateway

Compatibility, Deprecation, and Migration Plan

UI Changes

Users interact with the client based on the input and output printed on the terminal. In this FLIP, we propose to print all the non-query results in tableau style because now we cannot get the statement type information. 

The picture below shows the current EXPLAIN statement output format:

And the picture below shows the new output format. The execution plan is printed as one row in the table.

Cross-version Compatibility

We should also take cross-version compatibility into consideration.

  1. The gateway has SqlGatewayRestAPIVersion and the current version is V1. After new features being added to the gateway, we should bump the version to V2

  2. The client can't communicate with the V1 gateway because some methods necessary for the client were not implemented in the V1 gateway, such as configureSession, completeStatement.

Future Work

There has many improvements can be made to the Client. We have collected some feedbacks as follows:

  1. Request sending frequency control for client to fetch results. Currently, the client can't know whether the result is ready, so it may send fetching request frequently to the Gateway, leading to network resources waste. Here we list some possible ways:
    1. The client can connect to the gateway by web socket. The gateway can notify the client when the execution result is ready.
    2. Using bidirectional connection to do the same thing.
  2. Reduce the cost of serialization and deserialization of the results. Here we list some possible ways:
    1. Move the Gateway into the JobManager. It can reduce the Ser/De costs from JobManager to Gateway.
    2. The gateway can collect the data from the sink function directly instead of JobManager.
    3. The gateway forward the raw result instead of RowData result to the client. We can use TypeSerializer to deserialize the raw result in the client.
  3. More user-friendly improvements to the client. Here we list some feedbacks:
    1. More detailed exception information when executing file error. We can print information about the location of the file where the error is.
    2. Support pasting multiple sqls.
  4. Reduce networking cost in the local mode. Currently, we propose that the local mode client also uses network to connect with the gateway. In the future, we may use local channel as the medium.
  5. Add more generic statement type identifier to the ResultSet. We only proposed add isQueryResult, but more statement type information can be added. After we implementing this, we can make more improvements about the print style.
  6. ...

Test Plan

We will test this FLIP in three ways:

  1. Make sure that the existing tests can pass.

  2. Add new unit tests for new features, such as ClientParser, Client in Remote Mode.

  3. Add new E2E test for remote mode.

Rejected Alternatives

Use SqlGatewayService directly V.S. Communicate with it through RestEndpoint

Since in local mode, the client and gateway locate at the same JVM, the client can instantiate a SqlGatewayService and invoke it's methods directly. However, we must maintain codes to perform the direct communication. It is different from the remote mode in which the client communicates with the gateway via REST API. So we decided to unify the communication methods. 

Compose Printable Result in the Gateway V.S Compose it in the Client

The gateway will serialize the RowData into JSON format and return it to the Client. We proposed to serialize the RowData into a printable format early that the client can print it directly. But later we think it's better to keep the original serialization behavior and let the client to determine how to print the result. In this way we can keep the gateway clean.



  • No labels