Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please using the following options to configure the REST endpint.

Option name

Default Value(Required)

Description

sql-gateway.endpoint.type

rest (Yes)

REST endpoint should use 'rest'.

sql-gateway.endpoint.rest.port

8083(No)

REST endpoint port.

sql-gateway.endpoint.rest.address

127.0.0.1 (No)

The address that the SqlServer binds itself.


Pluggable Endpoint Discovery

...

Code Block
languagejava
/** Interface for Endpoint. */
public interface SQLGatewayEndpointSqlGatewayEndpoint {

    void start() throws Exception;
    
    void stop() throws Exception;
}  

/**
 * A factory for creating Endpoint from Configuration. This
 * factory is used with Java's Service Provider Interfaces (SPI) for discovery.
 */
public interface SQLGatewayEndpointFactorySqlGatewayEndpointFactory extends Factory {

    SQLGatewayEndpointSqlGatewayEndpoint createSQLGatewayEndpointcreateSqlGatewayEndpoint(Context context);

    interface Context {
       	
        SQLGatewayServiceSqlGatewayService getSQLGatewayServicegetSqlGatewayService();

        MetricGroup getMetricGroup();

        /** Gives read-only access to the configuration of the Endpoint. */
        ReadableConfig getConfiguration();
    }
}

We also expose the option sql-gateway.endpoint.type to allow user to specify the endpoints. Considering that the different endpoints may have the same settings, e.g. port, users should add the endpoint identifier as the prefix to specify the option, e.g.rest.port. For simplicity, we don't plan to introduce another yaml for SQL Gateway and users can specify the gateway options in the flink-conf.yaml.

...

For example, users can add the following options in the flink-conf.yaml.

Code Block
languageyml
sql-gateway.endpoint.type: rest, hiveserver2
sql-gateway.endpoint.rest.address: localhost
sql-gateway.endpoint.rest.port: 9001
sql-gateway.endpoint.hiveserver2.address: localhost
sql-gateway.endpoint.hiveserver2.port: 9002

...

Code Block
languagejava
public class HandleIdentifier {
    UUID publicId;
    UUID secretId;
}


public class SessionHandle {
    HandleIdentifier identifier;
}

/**
 * Every Endpoint should expose its version and extend the interface.
 */
interface EndpointVersion {}

enum RestEndpointVersion implements EndpointVersion {
    V1;
}

/**
 * It's equal to the HiveServer2 TProtocolVersion. It should belong to the
 * hive module.
 */
enum HiveServer2EndpointVersion implements EndpointVersion {
  HIVE_CLI_SERVICE_PROTOCOL_V1,

  // V2 adds support for asynchronous execution
  HIVE_CLI_SERVICE_PROTOCOL_V2

  // V3 add varchar type, primitive type qualifiers
  HIVE_CLI_SERVICE_PROTOCOL_V3

  // V4 add decimal precision/scale, char type
  HIVE_CLI_SERVICE_PROTOCOL_V4

  // V5 adds error details when GetOperationStatus returns in error state
  HIVE_CLI_SERVICE_PROTOCOL_V5

  // V6 uses binary type for binary payload (was string) and uses columnar result set
  HIVE_CLI_SERVICE_PROTOCOL_V6

  // V7 adds support for delegation token based connection
  HIVE_CLI_SERVICE_PROTOCOL_V7

  // V8 adds support for interval types
  HIVE_CLI_SERVICE_PROTOCOL_V8

  // V9 adds support for serializing ResultSets in SerDe
  HIVE_CLI_SERVICE_PROTOCOL_V9

  // V10 adds support for in place updates via GetOperationStatus
  HIVE_CLI_SERVICE_PROTOCOL_V10

  // V11 adds timestamp with local time zone type
  HIVE_CLI_SERVICE_PROTOCOL_V11

}

enum OperationType { 
  EXECUTE_STATEMENT, 
  GET_INFO,
  UNKNOWN;
}


public class OperationHandle {
    HandleIdentifier identifier;
}

...

SqlGatewayService API

Code Block
languagejava
interface SQLGatewayServiceSqlGatewayService {
        
    // -------------------------------------------------------------------------------------------
    // Session Management
    // -------------------------------------------------------------------------------------------

    SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException;
    
    void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
    
    Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException;
    
    // -------------------------------------------------------------------------------------------
    // Operation Management
    // -------------------------------------------------------------------------------------------
        
    /**
     * Get operation info to describe the Operation.
     */
    OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    /** Get the result schema for the specified Operation. */
    ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle opreationHandle) throws SqlGatewayException;
    
    void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException;
    
    // -------------------------------------------------------------------------------------------
    // Statements 
    // -------------------------------------------------------------------------------------------
    
    /** 
     * Using the statement to initialize the Session. It's only allowed to 
     * execute SET/RESET/CREATE/DROP/USE/ALTER/LOAD MODULE/UNLOAD MODULE/ADD JAR/REMOVE JAR.
     */
    void configureSession(SessionHandle sessionHandle, String statement, long executionTimeoutMs) throws SqlGatewayException;
    
    /** Execute the statement with the specified Session. It allows to execute with Operation-level configuration.*/
    OperationHandle executeStatement(
        SessionHandle sessionHandle, 
        String statement, 
        long executionTimeoutMs, 
        Configuration executionConfig) throws SqlGatewayException;
   
	/**
	 * Fetch the results with token id. 
     */
    ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, int token, int maxRows) throws SqlGatewayException;

  	/**
     * Fetch the Operation-level log from the GatewayService. For some endpoint, it allows to fetch the log at the operation level.
     */
    ResultSet fetchLog(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException;
     
   /**
    * Only supports to fetch results in FORWARD/BACKWARD orientation.
    * - Users can only BACKWARD from the current offset once.
    * - The Gateway doesn't materialize the changelog.
    */
    ResultSet fetchResult(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation orientation, int maxRows) throws SqlGatewayException;          

	/**
     * For the same functionality, every endpoint has its result schema. Therefore, 
     * the endpoint submit the callable executor to the OperationManager that manages
     * lifecycle of the Operaiton. The callable executor organizes the results
     * as the Endpoint requires.
     */
    OperationHandle submitOperation(OperationType type, Callable<ResultSet> executor, ResolvedSchema resultSchema) throws SqlGatewayException;
    
    // -------------------------------------------------------------------------------------------
    // Utils 
    // -------------------------------------------------------------------------------------------
    
    /**
     * Describe the cluster info.
     */
    Map<String, String> getGatewayInfo();
    
    void heartbeat(SessionHandle sessionHandle) throws SqlGatewayException;
    
    /**
     * Endpoint is status-less. All the session configs are memorized in the GatewayService side.
     */     
	EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) throws SqlGatewayException;
    
    /** Returns a list of completion hints for the given statement at the given position. */
    List<String> completeStatement(String sessionId, String statement, int position) throws SqlGatewayException;

   
    // -------------------------------------------------------------------------------------------
    // Catalog API 
    // -------------------------------------------------------------------------------------------
    
    String getCurrentCatalog(SessionHandle sessionHandle) throws SqlGatewayException;

    String getCurrentDatabase(SessionHandle sessionHandle) throws SqlGatewayException;
    
    List<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException;
    
    List<String> listDatabases(SessionHandle sessionHandle, String catalogName) throws SqlGatewayException;   
    
    List<TableInfo> listTables(SessionHandle sessionHandle, String catalogName, String databaseName, TableKind tableKind) throws SqlGatewayException;
    
    List<UserDefinedFunctionInfo>List<FunctionInfo> listUserDefinedFunctionslistFunctions(SessionHandle sessionHandle, String catalogName, String databaseName, FunctionScope scope) throws SqlGatewayException;
    
    ContextResolvedTable getTable(SessionHandle sessionHandle, ObjectIdentifier tableIdentifier) throws SqlGatewayException;
    
    ContextResolvedFunction getFunction(SessionHandle sessionHandle, ObjectIdentifier functionIdentifier) throws SqlGatewayException;
    
}

class TableInfo {
    boolean isTemporary;
    ObjectIdentifier identifier;
    TableKind tableKind;
}

class UserDefinedFunctionInfo {
    boolean isTemporary;
    ObjectIdentifier identifier;
    FunctionKind kind;
}

class SessionEnvironment {
    private String sessionName;
    private EndpointVersion sessionEndpointVersion;
    private List<URL> libs;
    private List<URL> jars;
    private Map<String, String> sessionConfig
}

public class OperationInfo {
    OperationStatus status;
    OperationType type;
    boolean hasResult;
}

public class ResultSet {
    
    int nextToken;
    
    ResultType resultType; 

    ResolvedSchema resultSchema;
    
    List<RowData> results;
    
    Exception exception;
}

public enum ResultType {
    PAYLOAD,
    EMPTY,
    EOS,
    ERROR
}

...

We will collect more feedbacks to determine which features is more important to users.


Rejected Alternatives

TableInfo and

...

FunctionInfo VS CatalogTable and CatalogFunction

The CatalogTable and CatalogFunction are much heavier than the TableInfo and UserDefinedFunctionFunctionInfo. The CatalogManager requires reading from the Catalog to get the schema. But in the listTables only care about the table name, which is much lighter. Therefore, we propose to use the TableInfo with required fields. 

...

If we support the multi-version in the Gateway(GatewayService also has a version), it means all the Flink are in the same JVM. It requires the Gateway to solve the shim, classloader for different flink versions, deployments. .It will mess up all the codes with refelection. It's better we can follow the design as other tools.

...

  1. Reduce the cost, e.g. CI test, releases, maintain cost;
  2. SQL Client has the ability to submit the SQL to the SQLGateway and we can reuse most of the codes;'
  3. Gateway inside the Flink repo can ensure the highest degree of version compatibility
  4. Gateway is indispensable for a SQL engine (think of Trino/Presto, Spark, Hive). Otherwise, Flink will always be a processing system. With Gateway inside the Flink repo, Flink can provide an out-of-box experience as a SQL query engine. Users can try out the gateway for the latest version when a new version is released.

Therefore, we prefer to merge the Gateway into the Flink repo.

...