Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

    • GatewayService exposes API about the management of the Sessionand Session,Operationand utils. The Endpoints can combine these methods to satisfy its requirements;
    • GatewayService is responsible to expose the registry of the Session;
    • GatewayService GatewayService executes the Operation async: when submit the operation to the OperationManager, return the OperationHandle. It doesn't wait the Operation execution finishes ;
    • GatewayService is shared between all the loaded endpoints. 
  • Endpoint: It's the main entry point for the users to visit.

...

Option name

Default Value(Required)

Description

endpoint.type

rest (Yes)

REST endpoint should use 'rest'.

endpoint.rest.port

8083(No)

REST endpoint port.

endpoint.rest.address

127.0.0.1 (No)

The address that the SqlServer binds itself.

GatewayService API

Object API

...


Pluggable Endpoint Discovery

The pluggable endpoint discovery will load the endpoint dynamically. It enables the users to load the endpoint they want to use. Here We use the SPI mechanism to discover the Endpoint.

Code Block
languagejava
public class HandleIdentifier {
    UUID publicId;
    UUID secretId;
}


public class SessionHandle {
    HandleIdentifier identifier/** Interface for Endpoint. */
public interface SQLGatewayEndpoint {

    void start() throws Exception;
    
    void stop() throws Exception;
}

/**
 *Factory Everyto Endpoint should expose its version and extend the interface.
create SQLGatewayEndpointService. */
public interface EndpointVersionSQLGatewayEndpointFactory extends Factory {}

enum RestEndpointVersion implements EndpointVersion {
SQLGatewayEndpoint    V1createSQLGatewayEndpoint(Context context);
}

/**
  * It's equalinterface toContext the{
 HiveServer2 TProtocolVersion. It should belong to the	
 * hive module.
 */
enum HiveServer2EndpointVersion implements EndpointVersion {
  HIVE_CLI_SERVICE_PROTOCOL_V1,SQLGatewayService getSQLGatewayService();

  // V2 adds support for asynchronous execution
  HIVE_CLI_SERVICE_PROTOCOL_V2MetricGroup getMetricGroup();

  // V3 add varchar type, primitive type qualifiers
  /** Gives read-only access to the configuration of the Endpoint. */
        ReadableConfig getConfiguration();
    }
}

We also expose the option endpoint.type to allow user to specify the endpoints. Considering that the different endpoints may have the same settings, e.g. port, users should add the endpoint identifier as the prefix to specify the option, e.g.rest.port. For simplicity, we don't plan to introduce another yaml for SQL Gateway and users can specify the gateway options in the flink-conf.yaml.


For example, users can add the following options in the flink-conf.yaml.

Code Block
languageyml
endpoint.type: rest, hiveserver2
endpoint.rest.address: localhost
endpoint.rest.port: 9001
endpoint.hiveserver2.address: localhost
endpoint.hiveserver2.port: 9002


GatewayService API

Object API

The API is used to describe the Session and Operation.

Code Block
languagejava
public class HandleIdentifier {
    UUID publicId;
    UUID secretId;
}


public class SessionHandle {
    HandleIdentifier identifier;
}

/**
 * Every Endpoint should expose its version and extend the interface.
 */
interface EndpointVersion {}

enum RestEndpointVersion implements EndpointVersion {
    V1;
}

/**
 * It's equal to the HiveServer2 TProtocolVersion. It should belong to the
 * hive module.
 */
enum HiveServer2EndpointVersion implements EndpointVersion {HIVE_CLI_SERVICE_PROTOCOL_V3

  // V4 add decimal precision/scale, char type
  HIVE_CLI_SERVICE_PROTOCOL_V4

  // V5 adds error details when GetOperationStatus returns in error state
  HIVE_CLI_SERVICE_PROTOCOL_V5

  // V6 uses binary type for binary payload (was string) and uses columnar result set
  HIVE_CLI_SERVICE_PROTOCOL_V6

  // V7 adds support for delegation token based connection
  HIVE_CLI_SERVICE_PROTOCOL_V7

  // V8 adds support for interval types
  HIVE_CLI_SERVICE_PROTOCOL_V8

  // V9 adds support for serializing ResultSets in SerDe
  HIVE_CLI_SERVICE_PROTOCOL_V9

  // V10 adds support for in place updates via GetOperationStatus
  HIVE_CLI_SERVICE_PROTOCOL_V10

  // V11 adds timestamp with local time zone type
  HIVE_CLI_SERVICE_PROTOCOL_V11

}V1,

enum OperationType {// 
V2 adds EXECUTE_STATEMENT, 
  GET_INFO,
  UNKNOWN;
}


public class OperationHandle {
    HandleIdentifier identifier;
}

SQLGatewayService API

support for asynchronous execution
  HIVE_CLI_SERVICE_PROTOCOL_V2

  // V3 add varchar type, primitive type qualifiers
  HIVE_CLI_SERVICE_PROTOCOL_V3

  // V4 add decimal precision/scale, char type
  HIVE_CLI_SERVICE_PROTOCOL_V4

  // V5 adds error details when GetOperationStatus returns in error state
  HIVE_CLI_SERVICE_PROTOCOL_V5

  // V6 uses binary type for binary payload (was string) and uses columnar result set
  HIVE_CLI_SERVICE_PROTOCOL_V6

  // V7 adds support for delegation token based connection
  HIVE_CLI_SERVICE_PROTOCOL_V7

  // V8 adds support for interval types
  HIVE_CLI_SERVICE_PROTOCOL_V8

  // V9 adds support for serializing ResultSets in SerDe
  HIVE_CLI_SERVICE_PROTOCOL_V9

  // V10 adds support for in place updates via GetOperationStatus
  HIVE_CLI_SERVICE_PROTOCOL_V10

  // V11 adds timestamp with local time zone type
  HIVE_CLI_SERVICE_PROTOCOL_V11

}

enum OperationType { 
  EXECUTE_STATEMENT, 
  GET_INFO,
  UNKNOWN;
}


public class OperationHandle {
    HandleIdentifier identifier;
}

SQLGatewayService API

Code Block
languagejava
interface SQLGatewayService {
        
    // -----------
Code Block
languagejava
interface SQLGatewayService {
        
    // -------------------------------------------------------------------------------------------
    // Session Management
    // -------------------------------------------------------------------------------------------

    SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
    
    void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
    
    Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException;
    
    // -------------------------------------------------------------------------------------------
    // Operation Management
    // --------------------------------------------------------------------------------------------------------------
    // Session   Management
    /**
     * Get operation info to describe the Operation.
     */
    OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle/ -------------------------------------------------------------------------------------------

    SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
    
    /** Get the result schema for the specified Operation. */
    ResolvedSchema getOperationResultSchemavoid closeSession(SessionHandle sessionHandle, OperationHandle opreationHandle) throws SqlGatewayException;
    
    Map<String, voidString> cancelOperationgetSessionConfig(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    //// -------------------------------------------------------------------------------------------
    // StatementsOperation Management
    // -------------------------------------------------------------------------------------------
        
    /** 
     * UsingGet theoperation statementinfo to initializedescribe the SessionOperation.
 It's only allowed to */
    OperationInfo * execute SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR/REMOVE JAR.getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    /** Get the result schema for the specified Operation. */
    voidResolvedSchema configureSessiongetOperationResultSchema(SessionHandle sessionHandle, String statement, long executionTimeoutMsOperationHandle opreationHandle) throws SqlGatewayException;
    
    /** Execute the statement with the specified Session. It allows to execute with Operation-level configuration.*/void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    OperationHandlevoid executeStatementcloseOperation(
SessionHandle sessionHandle,       SessionHandle sessionHandle, 
        String statement, 
        long executionTimeoutMs, 
        Configuration executionConfigOperationHandle operationHandle) throws SqlGatewayException;
   
	/**
	 * Fetch the results with token id. 
     *//
    ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, int token, int maxRows) throws SqlGatewayException;

  	/**
     * Fetch the Operation-level log from the GatewayService. For some endpoint, it allows to fetch the log at the operation level.
     */
    ResultSet fetchLog(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException;
     
   /**
    * Only supports to fetch results in FORWARD/BACKWARD orientation.
    * - Users can only BACKWARD from the current offset once.
    * - The Gateway doesn't materialize the changelog.
     -------------------------------------------------------------------------------------------
    // Statements 
    // -------------------------------------------------------------------------------------------
    
    /** 
     * Using the statement to initialize the Session. It's only allowed to 
     * execute SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR/REMOVE JAR.
     */
    ResultSetvoid fetchResultconfigureSession(SessionHandle sessionHandle, OperationHandleString operationHandlestatement, FetchOrientation orientation, int maxRowslong executionTimeoutMs) throws SqlGatewayException;
       
    

	 /**
 Execute the statement with *the Forspecified theSession. sameIt functionality,allows everyto endpointexecute haswith its result schema. Therefore, Operation-level configuration.*/
    OperationHandle *executeStatement(
 the endpoint submit the callable executor to the OperationManager that managesSessionHandle sessionHandle, 
     * lifecycle of theString Operaiton.statement, The
 callable executor organizes the results
   long executionTimeoutMs, *
 as the Endpoint requires.
    Configuration executionConfig) throws SqlGatewayException;
   
	/**
	 * Fetch the results with token id. 
     */
    OperationHandleResultSet submitOperation(OperationType type, Callable<ResultSet> executor, ResolvedSchema resultSchemafetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, int token, int maxRows) throws SqlGatewayException;

  	/**
  
   * Fetch //the -------------------------------------------------------------------------------------------
    // Utils 
    // -------------------------------------------------------------------------------------------
    
    /**
     * Describe the cluster info.Operation-level log from the GatewayService. For some endpoint, it allows to fetch the log at the operation level.
     */
    ResultSet fetchLog(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException;
     
   /**
    * Only supports to fetch results in FORWARD/BACKWARD orientation.
    * - Users can only BACKWARD from the current offset once.
    * - The Gateway doesn't materialize the changelog.
    */
    ResultSet fetchResult(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException;          

	/**
     */
 For the same Map<Stringfunctionality, String> getGatewayInfo();
   every endpoint has its result schema. Therefore, 
    void heartbeat(SessionHandle sessionHandle) throws SqlGatewayException;
    
    /*** the endpoint submit the callable executor to the OperationManager that manages
     * Endpointlifecycle is status-less. All of the sessionOperaiton. configsThe arecallable memorizedexecutor inorganizes the GatewayService side.results
     */  as  the 
	EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) throws SqlGatewayException;
    
    /** Returns a list of completion hints for the given statement at the given position.Endpoint requires.
     */
    List<String>OperationHandle completeStatementsubmitOperation(StringOperationType sessionIdtype, StringCallable<ResultSet> statementexecutor, intResolvedSchema positionresultSchema) throws SqlGatewayException;

    
    // -------------------------------------------------------------------------------------------
    // CatalogUtils API 
    // -------------------------------------------------------------------------------------------
    
    /**
 String getCurrentCatalog(SessionHandle) throws SqlGatewayException;

 * Describe the String getCurrentDatabase(SessionHandle) throws SqlGatewayException;
cluster info.
      */
    List<String> listCatalogsMap<String, String> getGatewayInfo() throws SqlGatewayException;
    
    List<String>void listDatabasesheartbeat(StringSessionHandle catalogNamesessionHandle) throws SqlGatewayException;
    
    /**
     * List<TableInfo>Endpoint listTables(String catalogName, String databaseName, TableKind tableKind) throws SqlGatewayException;
    is status-less. All the session configs are memorized in the GatewayService side.
    List<UserDefinedFunctionInfo> listUserDefinedFunctions(String catalogName, String databaseName*/     
	EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) throws SqlGatewayException;
    
    ContextResolvedTable getTable(ObjectIdentifier tableIdentifier) throws SqlGatewayException;
    
    ContextResolvedFunction getFunction(ObjectIdentifier functionIdentifier) throws SqlGatewayException;
    
}

class TableInfo {
    boolean isTemporary;
    ObjectIdentifier identifier;/** Returns a list of completion hints for the given statement at the given position. */
    List<String> completeStatement(String sessionId, String statement, int position) throws SqlGatewayException;

   
    TableKind tableKind;
}

class UserDefinedFunctionInfo {
    boolean isTemporary;
    ObjectIdentifier identifier;
    FunctionKind kind;
    String className;
    String description;
}

enum TableKind {
    ALL,
    VIEW,
    TABLE
}

class SessionEnvironment {
    private String sessionName;
    private EndpointVersion sessionEndpointVersion;
    private List<URL> libs;
    private List<URL> jars;
    private Map<String, String> sessionConfig
}

public class OperationInfo {
    OperationStatus status;
    OperationType type;
    boolean hasResult;
}

public class ResultSet {
    
    int nextToken// -------------------------------------------------------------------------------------------
    // Catalog API 
    // -------------------------------------------------------------------------------------------
    
    String getCurrentCatalog(SessionHandle) throws SqlGatewayException;

    String getCurrentDatabase(SessionHandle) throws SqlGatewayException;
    
    List<String> listCatalogs() throws SqlGatewayException;
    
    List<String> listDatabases(String catalogName) throws SqlGatewayException;   
    
    List<TableInfo> listTables(String catalogName, String databaseName, TableKind tableKind) throws SqlGatewayException;
    
    ResultType resultType; 

    ResolvedSchema resultSchemaList<UserDefinedFunctionInfo> listUserDefinedFunctions(String catalogName, String databaseName) throws SqlGatewayException;
    
    List<RowData> results ContextResolvedTable getTable(ObjectIdentifier tableIdentifier) throws SqlGatewayException;
    
     Exception exception;ContextResolvedFunction getFunction(ObjectIdentifier functionIdentifier) throws SqlGatewayException;
    
}

publicclass enum ResultTypeTableInfo {
    PAYLOAD,boolean isTemporary;
    EMPTY,ObjectIdentifier identifier;
    TableKind tableKind;
}

class UserDefinedFunctionInfo EOS,
    ERROR
}

Options

Please use the following API to configure the gateway.

...

Option

...

Required

...

Default value

...

Description

sql-gateway.session.idle-timeout

...

No

...

5 min

Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero or negative value.

{
    boolean isTemporary;
    ObjectIdentifier identifier;
    FunctionKind kind;
    String className;
    String description;
}

enum TableKind {
    ALL,
    VIEW,
    TABLE
}

class SessionEnvironment {
    private String sessionName;
    private EndpointVersion sessionEndpointVersion;
    private List<URL> libs;
    private List<URL> jars;
    private Map<String, String> sessionConfig
}

public class OperationInfo {
    OperationStatus status;
    OperationType type;
    boolean hasResult;
}

public class ResultSet {
    
    int nextToken;
    
    ResultType resultType; 

    ResolvedSchema resultSchema;
    
    List<RowData> results;
    
    Exception exception;
}

public enum ResultType {
    PAYLOAD,
    EMPTY,
    EOS,
    ERROR
}

Options

Please use the following API to configure the gateway.

Option

Required

Default value

Description

sql-gateway.session.idle-timeout


No

5 min

Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero or negative value.


sql-gateway.session.check-interval


No

1 min

The check interval for session timeout, which can be disabled by setting to zero or negative value.


sql-gateway.session.max-num


No

1000000


The number of the active sessions.

sql-gateway.worker.threads.max


No

500

Maximum number of worker threads for the gateway workers.


sql-gateway.worker.threads.min


No

5

Minimum number of worker threads for the gateway workers.

sql-gateway.worker.keepalive-time


No

5 min

Keepalive time for an idle worker thread. When the number of workers exceeds the min workers, excessive threads are killed after this time interval.

sql-gateway.session.check-interval

...

No

...

1 min

The check interval for session timeout, which can be disabled by setting to zero or negative value.

sql-gateway.session.max-num

...

No

1000000

...

The number of the active sessions.

sql-gateway.worker.threads.max

...

No

...

500

Maximum number of worker threads for the gateway workers.

sql-gateway.worker.threads.min

...

No

...

5

...

Minimum number of worker threads for the gateway workers.

sql-gateway.worker.keepalive-time

...

No

...

5 min

Keepalive time for an idle worker thread. When the number of workers exceeds the min workers, excessive threads are killed after this time interval.

Pluggable Endpoint Discovery

We use the SPI mechanism to discover the Endpoint.

Code Block
languagejava
/** Interface for Endpoint. */
public interface SQLGatewayEndpoint {

    void start() throws Exception;
    
    void stop() throws Exception;
}

/** Factory to create SQLGatewayEndpointService. */
public interface SQLGatewayEndpointFactory extends Factory {

    SQLGatewayEndpoint createSQLGatewayEndpoint(Context context);

    interface Context {
       	
        SQLGatewayService getSQLGatewayService();

        MetricGroup getMetricGroup();

        /** Gives read-only access to the configuration of the Endpoint. */
        ReadableConfig getConfiguration();
    }
}

We also expose the option endpoint.type to allow user to specify the endpoints. Considering that the different endpoints may have the same settings, e.g. port, users should add the endpoint identifier as the prefix to specify the option, e.g.rest.port. For simplicity, we don't plan to introduce another yaml for SQL Gateway and users can specify the gateway options in the flink-conf.yaml.

For example, users can add the following options in the flink-conf.yaml.

...

languageyml

...


SQL Client API

When lanuch the SQL Client in the gateway mode, user should specify the address and port of the Gateway they want to commuincate. Therefore, we should expose the following startup options.

...