Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-15472

...

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

/v1/sessions/:session_handle

Verb: GET

Response code: 200 OK

Get the session config with the specified session handle.

Request body

{}

Response body

{

"properties": {

  "key": "value"

}

}

...


Configure Session

/v1/sessions/:session_handle/initializeconfigure_session

Verb: POST

Response code: 200 OK

Configures the session with the statement which could be SET/RESET/CREATE/ DROP/LOAD/UNLOAD/USE/ALTER/ADD JAR/REMOVE JAR. It can be used to initialize the Session:

  • register Catalog 
  • register user defined function
  • create predefined table

Note: The statement must be a single command, otherwise the server will throw an exception.


Request body

{

"statement": "", # required

"execution_timeout": "" # execution time limit in milliseconds, optional

}

Response body

{}

...

Trigger Session Heartbeat

/versionv1/sessions/:session_handle/heartbeat

Verb: POST

Response code: 200 OK

Trigger heartbeat to tell the server that the client is active, and to keep the session alive as long as configured timeout value.

If a session does not receive a heartbeat or any other operations, the session will be destroyed when the timeout is reached.

Request body

{}

Response body

{}

...

/v1/info

Verb: GET

Response code: 200 OK

Get meta data for this cluster

Request body

{}

Response body

{

"product_name": "Apache Flink",

"version": "1.16" # Flink version

}

Options

Please using the following options to configure the REST endpint.

Get Version

/api_versions

Verb: GET

Response code: 200 OK

Get the current avaliable versions for the Rest Endpoint. The client can choose one of the return version as the protocol for later communicate.

Request body

{}

Response body

{

"versions": ["v1", "v2"] # The rest endpoint support version.

}

Options

Please using the following options to configure the REST endpint.

Option name

Default Value(Required)

Description

sql-gateway.endpoint.type

rest

Option name

Default Value(Required)

Description

endpoint.type

rest

(Yes)

REST endpoint should use 'rest'.

sql-gateway.endpoint.rest.port

8083(No)

REST endpoint port.

sql-gateway.endpoint.rest.address

127.0.0.1 (No)

The address that the SqlServer binds itself.


Pluggable Endpoint Discovery

...

Code Block
languagejava
/** Interface for Endpoint. */
public interface SQLGatewayEndpointSqlGatewayEndpoint {

    void start() throws Exception;
    
    void stop() throws Exception;
}  

/**
 * A factory for creating Endpoint from Configuration. This
 * factory is used with Java's Service Provider Interfaces (SPI) for discovery.
 */
public interface SQLGatewayEndpointFactorySqlGatewayEndpointFactory extends Factory {

    SQLGatewayEndpointSqlGatewayEndpoint createSQLGatewayEndpointcreateSqlGatewayEndpoint(Context context);

    interface Context {
       	
        SQLGatewayServiceSqlGatewayService getSQLGatewayServicegetSqlGatewayService();

        MetricGroup getMetricGroup();

        /** Gives read-only access to the configuration of the Endpoint. */
        ReadableConfig getConfiguration();
    }
}

We also expose the option sql-gateway.endpoint.type to allow user to specify the endpoints. Considering that the different endpoints may have the same settings, e.g. port, users should add the endpoint identifier as the prefix to specify the option, e.g.rest.port. For simplicity, we don't plan to introduce another yaml for SQL Gateway and users can specify the gateway options in the flink-conf.yaml.

...

For example, users can add the following options in the flink-conf.yaml.

Code Block
languageyml
sql-gateway.endpoint.type: rest, hiveserver2
sql-gateway.endpoint.rest.address: localhost
sql-gateway.endpoint.rest.port: 9001
sql-gateway.endpoint.hiveserver2.address: localhost
sql-gateway.endpoint.hiveserver2.port: 9002

...

Code Block
languagejava
public class HandleIdentifier {
    UUID publicId;
    UUID secretId;
}


public class SessionHandle {
    HandleIdentifier identifier;
}

/**
 * Every Endpoint should expose its version and extend the interface.
 */
interface EndpointVersion {}

enum RestEndpointVersion implements EndpointVersion {
    V1;
}

/**
 * It's equal to the HiveServer2 TProtocolVersion. It should belong to the
 * hive module.
 */
enum HiveServer2EndpointVersion implements EndpointVersion {
  HIVE_CLI_SERVICE_PROTOCOL_V1,

  // V2 adds support for asynchronous execution
  HIVE_CLI_SERVICE_PROTOCOL_V2

  // V3 add varchar type, primitive type qualifiers
  HIVE_CLI_SERVICE_PROTOCOL_V3

  // V4 add decimal precision/scale, char type
  HIVE_CLI_SERVICE_PROTOCOL_V4

  // V5 adds error details when GetOperationStatus returns in error state
  HIVE_CLI_SERVICE_PROTOCOL_V5

  // V6 uses binary type for binary payload (was string) and uses columnar result set
  HIVE_CLI_SERVICE_PROTOCOL_V6

  // V7 adds support for delegation token based connection
  HIVE_CLI_SERVICE_PROTOCOL_V7

  // V8 adds support for interval types
  HIVE_CLI_SERVICE_PROTOCOL_V8

  // V9 adds support for serializing ResultSets in SerDe
  HIVE_CLI_SERVICE_PROTOCOL_V9

  // V10 adds support for in place updates via GetOperationStatus
  HIVE_CLI_SERVICE_PROTOCOL_V10

  // V11 adds timestamp with local time zone type
  HIVE_CLI_SERVICE_PROTOCOL_V11

}

enum OperationType { 
  EXECUTE_STATEMENT, 
  GET_INFO,
  UNKNOWN;
}


public class OperationHandle {
    HandleIdentifier identifier;
}

...

SqlGatewayService API

Code Block
languagejava
interface SQLGatewayServiceSqlGatewayService {
        
    // -------------------------------------------------------------------------------------------
    // Session Management
    // -------------------------------------------------------------------------------------------

    SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
    
    void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
    
    Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException;
    
    // -------------------------------------------------------------------------------------------
    // Operation Management
    // -------------------------------------------------------------------------------------------
        
    /**
     * Get operation info to describe the Operation.
     */
    OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    /** Get the result schema for the specified Operation. */
    ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle opreationHandle) throws SqlGatewayException;
    
    void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    // -------------------------------------------------------------------------------------------
    // Statements 
    // -------------------------------------------------------------------------------------------
    
    /** 
     * Using the statement to initialize the Session. It's only allowed to 
     * execute SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR/REMOVE JAR.
     */
    void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException;
    
    /** Execute the statement with the specified Session. It allows to execute with Operation-level configuration.*/
    OperationHandle executeStatement(
        SessionHandle sessionHandle, 
        String statement, 
        long executionTimeoutMs, 
        Configuration executionConfig) throws SqlGatewayException;
   
	/**
	 * Fetch the results with token id. 
     */
    ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, int token, int maxRows) throws SqlGatewayException;

  	/**
     * Fetch the Operation-level log from the GatewayService. For some endpoint, it allows to fetch the log at the operation level.
     */
    ResultSet fetchLog(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException;
     
   /**
    * Only supports to fetch results in FORWARD/BACKWARD orientation.
    * - Users can only BACKWARD from the current offset once.
    * - The Gateway doesn't materialize the changelog.
    */
    ResultSet fetchResult(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException;          

	/**
     * For the same functionality, every endpoint has its result schema. Therefore, 
     * the endpoint submit the callable executor to the OperationManager that manages
     * lifecycle of the Operaiton. The callable executor organizes the results
     * as the Endpoint requires.
     */
    OperationHandle submitOperation(OperationType type, Callable<ResultSet> executor, ResolvedSchema resultSchema) throws SqlGatewayException;
    
    // -------------------------------------------------------------------------------------------
    // Utils 
    // -------------------------------------------------------------------------------------------
    
    /**
     * Describe the cluster info.
     */
    Map<String, String> getGatewayInfo();
    
    void heartbeat(SessionHandle sessionHandle) throws SqlGatewayException;
    
    /**
     * Endpoint is status-less. All the session configs are memorized in the GatewayService side.
     */     
	EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) throws SqlGatewayException;
    
    /** Returns a list of completion hints for the given statement at the given position. */
    List<String> completeStatement(String sessionId, String statement, int position) throws SqlGatewayException;

   
    // -------------------------------------------------------------------------------------------
    // Catalog API 
    // -------------------------------------------------------------------------------------------
    
    String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException;

    String getCurrentDatabase(SessionHandle sessionHandle) throws SqlGatewayException;
    
    List<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException;
    
    List<String> listDatabases(SessionHandle sessionHandle, String catalogName) throws SqlGatewayException;   
    
    List<TableInfo> listTables(SessionHandle sessionHandle, String catalogName, String databaseName, TableKind tableKind) throws SqlGatewayException;
    
    List<UserDefinedFunctionInfo>List<FunctionInfo> listUserDefinedFunctionslistFunctions(SessionHandle sessionHandle, String catalogName, String databaseName, FunctionScope scope) throws SqlGatewayException;
    
    ContextResolvedTable getTable(SessionHandle sessionHandle, ObjectIdentifier tableIdentifier) throws SqlGatewayException;
    
    ContextResolvedFunction getFunction(SessionHandle sessionHandle, ObjectIdentifier functionIdentifier) throws SqlGatewayException;
    
}

class TableInfo {
    boolean isTemporary;
    ObjectIdentifier identifier;
    TableKind tableKind;
}

class UserDefinedFunctionInfoFunctionInfo {
    boolean isTemporary;
    ObjectIdentifier identifier;
    FunctionKind kind;
}

class SessionEnvironment {
    private String sessionName;
    private EndpointVersion sessionEndpointVersion;
    private List<URL> libs;
    private List<URL> jars;
    private Map<String, String> sessionConfig
}

public class OperationInfo {
    OperationStatus status;
    OperationType type;
    boolean hasResult;
}

public class ResultSet {
    
    int nextToken;
    
    ResultType resultType; 

    ResolvedSchema resultSchema;
    
    List<RowData> results;
    
    Exception exception;
}

public enum ResultType {
    PAYLOAD,
    EMPTY,
    EOS,
    ERROR
}

...

Option

Required

Default value

Description

sql-gateway.session.idle-timeout


No

5 min

Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero or negative value.


sql-gateway.session.check-interval


No

1 min

The check interval for session timeout, which can be disabled by setting to zero or negative value.


sql-gateway.session.max-num


No

1000000


The number of the active sessions.

sql-gateway.worker.threads.max


No

500

Maximum number of worker threads for the gateway workers.


sql-gateway.worker.threads.min


No

5

Minimum number of worker threads for the gateway workers.

sql-gateway.worker.keepalive-time


No

5 min

Keepalive time for an idle worker thread. When the number of workers exceeds the min workers, excessive threads are killed after this time interval.


SQL

...

When lanuch the SQL Client in the gateway mode, user should specify the address and port of the Gateway they want to commuincate. Therefore, we should expose the following startup options.

...

Parameter

...

Requried

...

Description

...

-h, --host

...

Yes

...

The gateway address

...

-p, --port

...

Yes

...

The gatewary port

Gateway Script

Considering many users prefer to use the SqlGateway only, we propose to add a script named the `sql-server.sh` in the bin directory. Users can use the command

Code Block
languagebash
./sql-gateway.sh (start|stop|stop-all) [args]

to manipulate the sql gateway.

CommandParmeterDescription
start-Dkey=value

Start the gateway and write the pid of the startted sql gateway into the pid file. 


Users can specify the -Dkey=value to specify the parameters. For example, users can specify `-Dsql-gateway.endpoint.rest.address=127.0.0.1` 

stop(none)Stop the last in the pid file.
stop-all(none)Stop all the server in the running pid file.


Then users can start the sql client to communicate with the SQL Gateway in the local or remote environment.

SQL Client API

When lanuch the SQL Client in the gateway mode, user should specify the address and port of the Gateway they want to commuincate. Therefore, we should expose the following startup options.


Parameter

Requried

Description

-e, --endpoint

Yes

The gateway address:port


User can start up the SQL Client with the command in the bash.

Code Block
languagebash
./sql-client.sh -e 127.0.0.1:9092


Usage

With the SQL Gateway, users can just use the SQL to do everything. 

Configure parameters

 Users can use the SET statement to configure the parameters, including 

  • execution parameters, streaming or batch
  • optimization configuration
  • job parameters, job name 


If users want to reset the configuration, users can use the RESET statement to rollback all the settings.

Gateway also allows users to add the jar dynamically. Users can just use the ADD JAR statement to specify the jar path in the sql. 

Manage the metadata

Users can use the DDL to register the required catalog in the Gateway. With the catalog, users can CREATE/DROP/ALTER all tables in the catalog. 

Manage Jobs

Users can submit the DML in the Gateway.  In the future we may also support to use sql to manage the submitted jobs. 

Example

Here is an example to using the SQL client to submit the statement to the SQL Gateway.

Code Block
languagesql
Flink SQL> CREATE CATALOG hive WITH(
>  'type' = 'hive',
>  'hive-conf-dir' = '/opt/hive-conf'
>);
[INFO] Execute statement succeed.

Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE pageviews (
>   user_id BIGINT,
>   page_id BIGINT,
>   viewtime TIMESTAMP,
>   proctime AS PROCTIME()
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'pageviews',
>   'properties.bootstrap.servers' = '...',
>   'format' = 'avro'
> );
[INFO] Execute statement succeed.


Flink SQL> INSERT INTO hive.default_db.kafka_table SELECT * FROM pageviews;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 6b1af540c0c0bb3fcfcad50ac037c862


Implementation

SQL Client Overview 

SQL Client has different modes. The architecture of the SQL Client in the embedded mode as follows.


Image Added


In the Gateway mode, the SQL Client uses the Rest Client to communicate with the GatewayService.Image Added


Actually the architecture in the different mode are almost the same. The only difference is how to communicate with the GatewayService. Therefore, we focus on the Gateway mode in this section. The process logic in the Gateway mode should also works for the embeded mode.


The SQL Client is composed of the CliClientand Executor.

  • The CliClientis responsible to receives the statement from the terminal and print the results;
  • The Exectuor is responsible to execute the statement from the CliClient and return the results.
    • It has a client-level parser, which determines the statement whether is the client-level command, e.g. HELP, QUIT.
    • It can submit the statement to the GatewayService with the REST Client.

Gateway Implementation Details

  • Introduce the package flink-table/flink-sql-gateway-common, which includes the gateway API. When users wants to implement its own endpoint, they only needs to rely on this.
  • Introduce the package flink-table/flink-sql-gateway. It includes rest endpoint and SQL GatewayService.
  • flink-table/flink-sql-client should relies on the the package flink-table/flink-sql-gateway.

Compatibility, Deprecation, and Migration Plan

Because we introduce the Gateway modules there are no compatibility problems. For the SQL Client, we use the Gateway to submit jobs. We will keep all the functionality but change the presentation. For example,  when executing explain, the SQL Client will print the results in the now but currently we takes the results from the table now.

Future work

The Gateway itself has many functionlities. We only list part of the work here:

  1. Metric System
  2. Authentication module
  3. Multiple Endpoint, e.g. HiveServer2 endpoint
  4. Persistent Gateway
  5. ...

We will collect more feedbacks to determine which features is more important to users.


Rejected Alternatives

TableInfo and FunctionInfo VS CatalogTable and CatalogFunction

The CatalogTable and CatalogFunction are much heavier than the TableInfo and FunctionInfo. The CatalogManager requires reading from the Catalog to get the schema. But in the listTables only care about the table name, which is much lighter. Therefore, we propose to use the TableInfo with required fields. 

Support the multi-version Flink in the Gateway VS Support the multi-version in the external Service

Currently many big data tools, e.g. Zeppelin, Livy[1] support the multiple version engine. They both have the similar architecture. The Flink on Zeppelin is like[2]:

Image Added

The Interpreter is a process, which means that every Flink is in the separate JVM[3].


The current Gateway is responsible for compiling the user SQL and submitting the job to the cluster, which is almost the same as the Interpreter in the graph. 

If we support the multi-version in the Gateway(GatewayService also has a version), it means all the Flink are in the same JVM. It requires the Gateway to solve the shim, classloader for different flink versions, deployments. It will mess up all the codes with refelection. It's better we can follow the design as other tools.


[1] https://issues.cloudera.org/browse/LIVY-106

[2] https://zeppelin.apache.org/docs/latest/interpreter/flink.html#flink-on-zeppelin-architecture

[3] https://zeppelin.apache.org/docs/0.6.2/development/writingzeppelininterpreter.html

Merge Gateway into the Flink code base VS Support Gateway in the another repo

The main reason to move the Gateway is to support the multiple Flink versions in the Gateway. However, in the discussion above we think the Gateway is bound to the specific Flink and uses the external service to manage the Gateway instances. Considering the Gateway itself is bound to the Flink, it's better we can merge the Gateway into the Flink repo.

It also brings the following benefits:

  1. Reduce the cost, e.g. CI test, releases, maintain cost;
  2. SQL Client has the ability to submit the SQL to the SQLGateway and we can reuse most of the codes;'
  3. Gateway inside the Flink repo can ensure the highest degree of version compatibility
  4. Gateway is indispensable for a SQL engine (think of Trino/Presto, Spark, Hive). Otherwise, Flink will always be a processing system. With Gateway inside the Flink repo, Flink can provide an out-of-box experience as a SQL query engine. Users can try out the gateway for the latest version when a new version is released.

Therefore, we prefer to merge the Gateway into the Flink repo.

Result retrieval for interactive sessions VS result retrieval for fault tolerant exactly-once results

Because the current Gateway doesn't materialize anything to the stroage, we can't promise the exactly-once semantic. It means users can't retrieval the results if the Gateway has been notified the results is been taken away. 

OperationStatus VS JobStaus

The main reason we don't use the JobStatus in the state in the machine:

  • Operation includes DDL, DML and so on, which is much larger than the Job. We can't use a small concept to replace large concept.
  • Some status in the JobStats is meaningless in the Operation. For example, DDL Operation don't need RESTARTING/SUSPENDED/RECONCILING.
  • the Gateway allows to submit job(DML) in sync/async mode. The running status in the Operation Status in the different mode has different meaning:

    • In the async mode, when the gateway submits the job, the state comes to the FINISHED state
    • In the sync mode, the running status in the Operation status includes submitting the job, running job. Even if a failover occurs, we still think that this Operation is in the RUNNING state. Unless the job is unrecoverable, we change the Operation status to ERROR.

Therefore, we propose a new state machine in the Gateway side.

User can start up the SQL Client with the command in the bash.

Code Block
languagebash
./sql-client.sh gateway -h 127.0.0.1 -p 9092

Usage

With the SQL Gateway, users can just use the SQL to do everything. 

Configure parameters

 Users can use the SET statement to configure the parameters, including 

  • execution parameters, streaming or batch
  • optimization configuration
  • job parameters, job name 

If users want to reset the configuration, users can use the RESET statement to rollback all the settings.

Gateway also allows users to add the jar dynamically. Users can just use the ADD JAR statement to specify the jar path in the sql. 

Manage the metadata

Users can use the DDL to register the required catalog in the Gateway. With the catalog, users can CREATE/DROP/ALTER all tables in the catalog. 

Manage Jobs

Users can submit the DML in the Gateway.  In the future we may also support to use sql to manage the submitted jobs. 

Example

Here is an example to using the SQL client to submit the statement to the SQL Gateway.

Code Block
languagesql
Flink SQL> CREATE CATALOG hive WITH(
>  'type' = 'hive',
>  'hive-conf-dir' = '/opt/hive-conf'
>);
[INFO] Execute statement succeed.

Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE pageviews (
>   user_id BIGINT,
>   page_id BIGINT,
>   viewtime TIMESTAMP,
>   proctime AS PROCTIME()
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'pageviews',
>   'properties.bootstrap.servers' = '...',
>   'format' = 'avro'
> );
[INFO] Execute statement succeed.


Flink SQL> INSERT INTO hive.default_db.kafka_table SELECT * FROM pageviews;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 6b1af540c0c0bb3fcfcad50ac037c862

Implementation

SQL Client Overview 

SQL Client has different modes. The architecture of the SQL Client in the embedded mode as follows.

Image Removed

In the Gateway mode, the SQL Client uses the Rest Client to communicate with the GatewayService.Image Removed

Actually the architecture in the different mode are almost the same. The only difference is how to communicate with the GatewayService. Therefore, we focus on the Gateway mode in this section. The process logic in the Gateway mode should also works for the embeded mode.

The SQL Client is composed of the CliClientand Executor.

  • The CliClientis responsible to receives the statement from the terminal and print the results;
  • The Exectuor is responsible to execute the statement from the CliClient and return the results.
    • It has a client-level parser, which determines the statement whether is the client-level command, e.g. HELP, QUIT.
    • It can submit the statement to the GatewayService with the REST Client.

Gateway Implementation Details

  • Introduce the package flink-table/flink-sql-gateway-common, which includes the gateway API. When users wants to implement its own endpoint, they only needs to rely on this.
  • Introduce the package flink-table/flink-sql-gateway. It includes rest endpoint and SQL GatewayService.
  • flink-table/flink-sql-client should relies on the the package flink-table/flink-sql-gateway.

Compatibility, Deprecation, and Migration Plan

Because we introduce the Gateway modules there are no compatibility problems. For the SQL Client, we use the Gateway to submit jobs. We will keep all the functionality but change the presentation. For example,  when executing explain, the SQL Client will print the results in the now but currently we takes the results from the table now.

Future work

The Gateway itself has many functionlities. We only list part of the work here:

  1. Metric System
  2. Authentication module
  3. Multiple Endpoint, e.g. HiveServer2 endpoint
  4. Persistent Gateway
  5. ...

We will collect more feedbacks to determine which features is more important to users.

Rejected Alternatives

TableInfo and UserDefinedFunctionInfo VS CatalogTable and CatalogFunction

The CatalogTable and CatalogFunction are much heavier than the TableInfo and UserDefinedFunction. The CatalogManager requires reading from the Catalog to get the schema. But in the listTables only care about the table name, which is much lighter. Therefore, we propose to use the TableInfo with required fields. 

Support the multi-version Flink in the Gateway VS Support the multi-version in the external Service

Currently many big data tools, e.g. Zeppelin, Livy[1] support the multiple version engine. They both have the similar architecture. The Flink on Zeppelin is like[2]:

Image Removed

The Interpreter is a process, which means that every Flink is in the separate JVM[3].

The current Gateway is responsible for compiling the user SQL and submitting the job to the cluster, which is almost the same as the Interpreter in the graph. 

If we support the multi-version in the Gateway(GatewayService also has a version), it means all the Flink are in the same JVM. It requires the Gateway to solve the shim, classloader for different flink versions, deployments...It's better we can follow the design as other tools.

[1] https://issues.cloudera.org/browse/LIVY-106

[2] https://zeppelin.apache.org/docs/latest/interpreter/flink.html#flink-on-zeppelin-architecture

[3] https://zeppelin.apache.org/docs/0.6.2/development/writingzeppelininterpreter.html

Merge Gateway into the Flink code base VS Support Gateway in the another repo

The main reason to move the Gateway is to support the multiple Flink versions in the Gateway. However, in the discussion above we think the Gateway is bound to the specific Flink and uses the external service to manage the Gateway instances. Considering the Gateway itself is bound to the Flink, it's better we can merge the Gateway into the Flink repo.

It also brings the following benefits:

  1. Reduce the cost, e.g. CI test, releases, maintain cost;
  2. SQL Client has the ability to submit the SQL to the SQLGateway and we can reuse most of the codes;

Therefore, we prefer to merge the Gateway into the Flink repo.

Result retrieval for interactive sessions VS result retrieval for fault tolerant exactly-once results

Because the current Gateway doesn't materialize anything to the stroage, we can't promise the exactly-once semantic. It means users can't retrieval the results if the Gateway has been notified the results is been taken away. 

Appendix

Serialize and deserialize the ResultSet 

...

Code Block
languagetext
{
	"result_type": "PAYLOAD",
	"results": {
		"columns": [
			{
				"name": "id",
				"type": {"type":  "BIGINT", "nullable": false}
			},
   			{
				"name": "name",
				"type": {"type":  "VARCHAR", "nullable": true, "length": 300}
			},
		 	{
				"name": "birthday",
				"type": {"type":  "TIMESTAMP", "nullable": true, "precision": 3}
			}
		],
		"data": [
			{
				"kind": "INSERT",
				"fields": [101, "Jay", "1990-01-12T12:00.12"], # a row data
			},
			{
				"kind": "DELETE",
				"fields": [102, "Jimmy", null]
			}
		]
	},
	  "next_result_uri": /v1/sessions/:session_id/jobs/:job_id/result/:{token+1} # if not empty, uses this uri to fetch next part of result, else there is no more result.
	"exception": {
    	"root_cause": "....",
    	"exception_stack": "..." 
	}
}

...