Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state"Under Discussion"

Discussion thread

JIRA

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

In the batch In the BATCH world, Hive is still the standard. Considering that Flink has continuously improved its compatibility with Hive in recent years, we propose to expose HiveServer2 Endpoint on the Flink SQL Gateway. The Endpoint will implement the thrift interface exposed by HiveServer2, and users' BI, CLI and other tools based on the HiveServer2 can also be seamlessly migrated to the Flink SQL Gateway.

...

    • translate to the Operation that manipulates the Hive MetaStore
  • Users can lookup the Operation status and manage the submitted Operation.

...


Overall Design

HiveServer2 has the similar architecture we describe in the FLIP-91. User should register the Session in the SessionManager before submitting the statement. After that, the user uses the SessionHandle to find the registered resources in the Session and execute the statements. All statements will be translated to the Operation and use the OperationHandle to fetch the results or logs. Therefore, the HiveServer2 endpoint gurantees:

  1. The SessionHandle and OperationHandle in the HiveServer2 is able to convert to the SessionHandle and OperationHandle in the FLIP-91;
  2. The interfaces exposed by the HiveServer2 can be converted into related calls in Flink GatewayService;

Handle

The structure of the Handle in the HiveServer2 is as follows.

...

GatewayService API Change

Code Block
languagejava
public interface GatewayService {
    
    /**
     * Fetch the Operation-level log from the GatewayService.
     */
    ResultSet fetchLog(
        SessionHandle sessionHandle, 
        OperationHandle operationHandle, 
        FetchOrientation orientation, 
        int maxRows);
    
   /**
    * Only supports FORWARD/BACKWARD. 
    * - Users can only BACKWARD from the current offset once.
    * - The Gateway don't not materialize the changelog.
    */
    ResultSet fetchResult(
        SessionHandle sessionHandle, 
        OperationHandle operationHandle, 
        FetchOrientation orientation, 
        int maxRows
    );
}

enum FetchOrientation {
    FORWARD,
    BACKWARD
}

Session API

  • OpenSession
    • Usage:register a Session in the SessionManager
    • Return:SessionHandle
    • HiveServer2 Endpoint needs to do:
      • Determine the communication version between HiveServer2 and Client;
      • Invoke GatewayService#openSession to register the Session;
      • Configure the Hive Environment
        • Create the Hive Catalog
        • Switch to the Hive Dialect
        • Load the Hive Module
  • CloseSession
    • Usage: Clear the related resources;
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#closeSession

Operation API

  • GetOperationStatus
    • Usage: Get the Operation status.
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#getOperationStatus
  • CancelOperation
    • Usage: Cancel the Operation.
    • HiveServer2 Endpoint needs to do:
      • Invokes the GatewayService#cancelOperation
  • CloseOperatio
    • Usage: Close the Operation
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#closeOperation

Statement API

  • ExecuteStatement
    • Usage: Execute the SQL to the Gateway Service synchronously or asynchronously.
    • Return:OperationHandle
    • HiveServer2 Endpoint needs to do:
      • Invokes the GatewayService#executeStatement
      • HiveServer2 supports to execute statement in the synchronous mode.
        • ExecuteStatement with table.dml-sync is true.
        • Currently the GatewayService only supports submission in asynchronous mode. It requires the HiveServer2 Endpoint to monitor Operation status.
  • FetchResults
    • Usage: Supports fetching the results or logs with fetch orientation to the client.
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#fetchResults or GatewayService#fetchLog
  • GetResultSetMetadata
    • Usage: return the result schema
    • HiveServer2 Endpoint needs to do:
      • Invokes the GatewayService#getResultSchema
  • GetInfo
    • Get cluster info。
      • Only support to get the CLI_SERVER_NAME(FLINK) now. Extend other values if needed in the future.
  • GetTypeInfo
    • Get the ODBC's  type info.
  • GetCatalogs
    • Return the registered Catalogs.
    • Do as follows:

Session API

  • OpenSession
    • Usage:register a Session in the SessionManager
    • Return:SessionHandle
    • HiveServer2 Endpoint needs to do:
      • Determine the communication version between HiveServer2 and Client;
      • Invoke GatewayService#openSession to register the Session;
      • Configure the Hive Environment
        • Create the Hive Catalog
        • Switch to the Hive Dialect
        • Load the Hive Module
  • CloseSession
    • Usage: Clear the related resources;
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#closeSession

Operation API

  • GetOperationStatus
    • Usage: Get the Operation status.
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#getOperationStatus
  • CancelOperation
    • Usage: Cancel the Operation.
    • HiveServer2 Endpoint needs to do:
      • Invokes the GatewayService#cancelOperation
  • CloseOperatio
    • Usage: Close the Operation
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#closeOperation

Statement API

  • ExecuteStatement
    • Usage: Execute the SQL to the Gateway Service synchronously or asynchronously.
    • Return:OperationHandle
    • HiveServer2 Endpoint needs to do:
      • Invokes the GatewayService#executeStatement
      • HiveServer2 supports to execute statement in the synchronous mode.
        • ExecuteStatement with table.dml-sync is true.
        • Currently the GatewayService only supports submission in asynchronous mode. It requires the HiveServer2 Endpoint to monitor Operation status.
  • FetchResults
    • Usage: Supports fetching the results or logs with fetch orientation to the client.
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#fetchResults or GatewayService#fetchLog
  • GetResultSetMetadata
    • Usage: return the result schema
    • HiveServer2 Endpoint needs to do:
      • Invokes the GatewayService#getResultSchema
  • GetInfo
    • Get cluster info。
      • Only support to get the CLI_SERVER_NAME(FLINK) now. Extend other values if needed in the future.
  • GetTypeInfo
    • Get the ODBC's  type info.
  • GetCatalogs
    • Return the registered Catalogs.
    • Do as follows:
Code Block
languagejava
/**
 * The schema for the Operation is 
 * <pre>
 * +-------------------------------+--------------+
 * | column name    | column type  | comments     |
 * +-------------------------------+--------------+
 * | TABLE_CAT      | STRING       | catalog name |
Code Block
languagejava
/**
 * The schema for the Operation is 
 * <pre>
 * +-------------------------------+--------------+
 * | column name</pre>
 */
gateway.submitOperation(
    HiveServer2OperationType.GET_CATALOGS,
 | column type () | comments -> convertToGetCatalogs(gateway.listCatalogs()), 
    resultSchema);


  • GetSchemas
    • Return the databases info。Currently HiveServer2 supports to use regex to filter out the unmatched database.
    • Do as follow:
Code Block
languagejava
/**|
 * +-------- The schema for the Operation is 
 * <pre>
 * +-------------------------------+--------------+
 * | TABLE_CATcolumn  name    | STRINGcolumn type  | comments   | catalog name |
 * +-------------------------------+--------------+
 * </pre>
 */
gateway.submitOperation(
| TABLE_SCHEMA    HiveServer2OperationType.GET_CATALOGS,
| STRING    () -> convertToGetCatalogs(gateway.listCatalogs()), 
    resultSchema);
  • GetSchemas
    • Return the databases info。Currently HiveServer2 supports to use regex to filter out the unmatched database.
    • Do as follow:
Code Block
languagejava
/**
 * The schema for the Operation is 
 * <pre>
 * +----- | schema name  |
 * +-------------------------------+--------------+
 * | TABLE_CAT column name    | STRING column type  | comments  | catalog name |
 * +-------------------------------+--------------+
 * | TABLE_SCHEMA</pre>
 */
gateway.submitOperation(
   | STRING HiveServer2OperationType.GET_SCHEMAS,
    () -> {
 | schema name  |
 * +-------------------------------+--------------+
 * | TABLE_CAT  List<String> databases = filter(gateway.listDatabases(sessionHandle), databasePattern);
    | STRING    return convertToGetDatabasesResultSet(databases);
  | catalog name}, |
   * +------------- resultSchema);


  • GetTables
    • Get the tables in the specified Catalog and Database. HiveServer2 allows to use the Catalog/Database/Table Pattern to filter out the unmatched tables.
    • Do as follow:
Code Block
languagejava
/**
 * The schema for the Operation is 
 * <pre>
 * +------------------+-------------+--------------------------------+
 * </pre>
 */
gateway.submitOperation(
 | column name   HiveServer2OperationType.GET_SCHEMAS,
 | column type () ->| {
comments         List<String> databases = filter(gateway.listDatabases(sessionHandle), databasePattern);
         return convertToGetDatabasesResultSet(databases);|
    }, 
    resultSchema);
  • GetTables
    • Get the tables in the specified Catalog and Database. HiveServer2 allows to use the Catalog/Database/Table Pattern to filter out the unmatched tables.
    • Do as follow:
Code Block
languagejava
/**
 * The schema for the Operation is 
 * <pre>
 * +------------* +-------------------------------+-------------------------------------+--------------------------------+
 * | column name+
 * | TABLE_CAT      | STRING column type  | comments  | catalog name                   |
 * +-------------------------------+--------------------------------+
 * | TABLE_CAT   SCHEMA   | STRING       | catalogschema name                    |
 * +-------------------------------+--------------------------------+
 * | TABLE_SCHEMANAME     | STRING       | schematable name                     |
 * +-------------------------------+--------------------------------+
 * | TABLE_NAMETYPE     | STRING       | table name                  type, e.g. TABLE, VIEW   |
 * +-------------------------------+--------------------------------+
 * | TABLE_TYPE REMARKS        | STRING       | table type, e.g. TABLE, VIEW desc                     |
 * +-------------------------------+--------------------------------+
 * | REMARKS </pre>
 */
gateway.submitOperation(
    HiveServer2OperationType.GET_TABLES,
    () -> {
 | STRING      List<CatalogTable> |results table= descnew ArrayList<>(); 
        List<String> catalogs = filter(gateway.listCatalogs(sessionHandle), catalogPattern);
       |
 * +-------------------------------+--------------------------------+
 * </pre>
 */
gateway.submitOperation(
    HiveServer2OperationType.GET_TABLES,
    () -> {
     for (String catalog: catalogs) {
    List<CatalogTable> results = new ArrayList<>(); 
        List<String> catalogsdatabases = filter(gateway.listCatalogslistDatabases(sessionHandle, catalog), catalogPatterndatabasePattern);
         for (String catalog: catalogs) {
             List<String> databases = filter(gateway.listDatabases(sessionHandle, catalog), databasePattern);
                 for (String database: databases) {
                 List<String> tables = filter(gateway.listTables(sessionHandle, catalog, database), tablePattern);
                 for (String table: tables) {
                     results.add(gateway.getTable(catalog, database, table, ALL));
                 }
             }
         }
         return convertToGetTablesResultSet(results);
    }, 
    resultSchema);

...

Flink doesn't support to get the delegation token from the Yarn side now. 

...

Public Interfaces

GatewayService API Change

Code Block
languagejava
public interface GatewayService {
    
    /**
     * Fetch the Operation-level log from the GatewayService.
     */
    ResultSet fetchLog(
        SessionHandle sessionHandle, 
        OperationHandle operationHandle, 
        FetchOrientation orientation, 
        int maxRows);
    
   /**
    * Only supports FORWARD/BACKWARD. 
    * - Users can only BACKWARD from the current offset once.
    * - The Gateway don't not materialize the changelog.
    */
    ResultSet fetchResult(
        SessionHandle sessionHandle, 
        OperationHandle operationHandle, 
        FetchOrientation orientation, 
        int maxRows
    );
}

enum FetchOrientation {
    FORWARD,
    BACKWARD
}
 


Options

We use the same style as the HiveServer2 options。

...