Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

JIRA

...

https://lists.apache.org/thread/9r1j7ho2m8zbqy3tl7vvj9gnocggwr6x
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-28149

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

In the batch world, Hive is still the standard. Considering that Flink has continuously improved its compatibility with Hive in recent years, we propose to expose HiveServer2 Endpoint on in the Flink SQL Gateway. The Endpoint endpoint will implement the thrift interface exposed by HiveServer2, and users' BI, CLI, and other tools based on the HiveServer2 can also be seamlessly migrated to the Flink SQL Gateway.

...

    • translate to the Operation that manipulates the Hive MetaStore
  • Users can lookup look up the Operation status and manage the submitted Operation.

Overall Design

After the FLIP finishes, the users can do all the functionality above in the Flink SQL Gateway with the HiveServer2 endpoint.

Image Added


Overall Design

HiveServer2 has the similar architecture we describe in the FLIP-91. Users should register the HiveServer2 has the similar architecture we describe in the FLIP-91. User should register the Session in the SessionManager before submitting the statement. After that, the user uses the SessionHandle to find the registered resources in the Session and execute the statements. All statements will be translated to the Operation and use the OperationHandle to fetch the results or logs. Therefore, the HiveServer2 endpoint guranteesguarantees:

  1. The SessionHandle and OperationHandle in the HiveServer2 is able to can convert to the SessionHandle and OperationHandle in the FLIP-91;
  2. The interfaces exposed by the HiveServer2 can be converted into related the calls in Flink GatewayService;

Handle Mapping

The structure of the Handle in the HiveServer2 is as follows.

The transfomration transformation is as follows.

  • HiveServer2 SessionHandle has the same structure with as the Gateway SessionHandle
  • The HiveServer2 OperationHandle has more information than the OperationHandle in the Gateway. HiveServer2 endpoint should use the result of the GatewayService#getOperationInfo to build the OperationHandle in the HiveServer2.

HiveServer2 Endpoint API

Refer to the file for the HiveServer2 API.

GatewayService API Change

Session API

  • OpenSession
    • Usage:register a Session in the SessionManager
    • Return:SessionHandle
    • HiveServer2 Endpoint needs to do:
      • Determine the communication version between HiveServer2 and Client;
      • Invoke GatewayService#openSession to register the Session;
      • Configure the Hive Environment
        • Create the Hive Catalog
        • Switch to the Hive Dialect
        • Load the Hive Module
  • CloseSession
    • Usage: Clear the related resources;
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#closeSession

Operation API

  • GetOperationStatus
    • Usage: Get the Operation status.
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#getOperationStatus
  • CancelOperation
    • Usage: Cancel the Operation.
    • HiveServer2 Endpoint needs to do:
      • Invokes the GatewayService#cancelOperation
  • CloseOperatio
    • Usage: Close the Operation
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#closeOperation

Statement API

  • ExecuteStatement
    • Usage: Execute the SQL to the Gateway Service synchronously or asynchronously.
    • Return:OperationHandle
    • HiveServer2 Endpoint needs to do:
      • Invokes the GatewayService#executeStatement
      • HiveServer2 supports to execute statement in the synchronous mode.
        • ExecuteStatement with table.dml-sync is true.
        • Currently the GatewayService only supports submission in asynchronous mode. It requires the HiveServer2 Endpoint to monitor Operation status.
  • FetchResults
    • Usage: Supports fetching the results or logs with fetch orientation to the client.
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#fetchResults or GatewayService#fetchLog
  • GetResultSetMetadata
    • Usage: return the result schema
    • HiveServer2 Endpoint needs to do:
      • Invokes the GatewayService#getResultSchema
  • GetInfo
    • Get cluster info。
      • Only support to get the CLI_SERVER_NAME(FLINK) now. Extend other values if needed in the future.
  • GetTypeInfo
    • Get the ODBC's  type info.
  • GetCatalogs
    • Return the registered Catalogs.
    • Do as follows:
Code Block
languagejava
/**
 * The schema for the Operation is 
 * <pre>
 * +-------------------------------+--------------+
 * | column name    | column type  | comments     |
 * +-------------------------------+--------------+
 * | TABLE_CAT      | STRING       | catalog name |
 * +-------------------------------+--------------+
 * </pre>
 */
gateway.submitOperation(
    HiveServer2OperationType.GET_CATALOGS,
    () -> convertToGetCatalogs(gateway.listCatalogs()), 
    resultSchema);
  • GetSchemas
    • Return the databases info。Currently HiveServer2 supports to use regex to filter out the unmatched database.
    • Do as follow:
Code Block
languagejava
/**
 * The schema for the Operation is 
 * <pre>
 * +-------------------------------+--------------+
 * | column name    | column type  | comments     |
 * +-------------------------------+--------------+
 * | TABLE_SCHEMA   | STRING       | schema name  |
 * +-------------------------------+--------------+
 * | TABLE_CAT      | STRING       | catalog name |
 * +-------------------------------+--------------+
 * </pre>
 */
gateway.submitOperation(
    HiveServer2OperationType.GET_SCHEMAS,
    () -> {
         List<String> databases = filter(gateway.listDatabases(sessionHandle), databasePattern);
         return convertToGetDatabasesResultSet(databases);
    }, 
    resultSchema);
  • GetTables
    • Get the tables in the specified Catalog and Database. HiveServer2 allows to use the Catalog/Database/Table Pattern to filter out the unmatched tables.
    • Do as follow:

In this FLIP, most of the functionlities that supports by the HiveServer2 is implmented by the SQL Gateway with HiveServer2 Endpoint. For specific implemention detail, please refer to the Appendix. Here, we only focus on the difference between the HiveServer2 and SQL Gateway with HiveServer2 Endpoint.

Use SQL Gateway with HiveServer2 Endpoint as HiveServer2

HiveServer2 contains of the Hive metastore and the execution engine to submit the job to the Hadoop[1]. When users connect to the Flink SQL Gateway, it has a built-in the Hive Catalog and allows to the submit the SQL in Hive dialect. The Hive Catalog works like the Hive Metastore and users can lookup the metadata stored in the Hive Metastore. Therefore, it will register the HiveCatalog, switch to the Hive dialect and batch mode in the openSession API.

Differences with HiveServer2

  • GetCrossReference API

Flink doesn't have the concepts of cross-reference. So Flink SQL Gateway with HiveServer2 endpoint doesn't support this.

  • FetchResults

Currently, the SqlGatewayService only supports fetching forward and can not fetch backward multiple times but the HiveServer2 doesn't have the limitation. So Flink SQL Gateway with HiveServer2 endpoint will throw an exception if the client fetches backward multiple times.

  • Infomation loss between the conversion

Currently, the Gateway uses the HiveCatalog to get metadata from the metastore. It means we need to convert the results in the Hive dialect to the Flink dialect in the Catalog. When the endpoint sends the results to the users, it needs to translate the results in Flink dialect to Hive dialect. In some cases, it may lose some info. For example, Hive has 5 kinds TableType: MANAGED_TABLE, EXTERNAL_TABLE, VIRTUAL_TABLE,  INDEX_TABLE, and MATERIALIZED_VIEW. But Flink only supports TABLE/VIEW. It means users can't get the specific table type with the HiveServer2 endpoint.

  • Delegation Token

In this FLIP, we mainly focus on the basic functionalities. The delegation token is used as the ticket to identify the users and we leave the work in the future. So Flink SQL Gateway with HiveServer2 endpoint doesn't support this now. 

HiveServer2 Compatibility

Compatibility with Client

HiveServer2 uses the version to solve the compatibility between the client and server, which means hiveserver2 can communicate with the client in any version. The logic behind this is the 

  • During the OpenSession, the client and server determine the protocol version.
  • Client promises:

    • client should not send the request which is exposed in the higher version
    • client should not send the request that contains the parameter that is exposed in the higher version
    • client should send the request with the parameters that are specified in the determined protocol version.
  • Server promises:
    • return the result with the specified schema in the determined protocol version

If users want to use the HiveServer2 with a higher version, users can just upgrade the HiveServer2 endpoint's version(dependency version) and don't need to implement another HiveServer2 endpoint with the expected version. If users want to use the HiveServer2 in the lower version, the current endpoint should satisfy the requirements.

Compatibility with Hive metastore

During the setup, the HiveServer2 tries to load the config in the hive-site.xml to initialize the Hive metastore client. In the Flink, we use the Catalog interface to connect to the Hive Metastore.  The hiveserver2 endpoint requires the user to specify the path of the hive-site.xml as the endpoint parameters, which will be used to create the default HiveCatalog. Considering the HiveServer2 binds to the hive metastore with the same version, our hiveserver2 endoint also binds to the specific versions of the Hive Catalog. But user can switch to the different hive metastore instance using JDBC parameters or using the DDL to register a new HiveCatalog.

Public Interfaces

Options

We use the same style as the HiveServer2 options。

Option name

Default value(Required)

Description

sql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir

(none)(Yes)

URI to your Hive conf dir containing hive-site.xml. The URI needs to be supported by Hadoop FileSystem. If the URI is relative, i.e. without a scheme, the local file system is assumed. If the option is not specified, hive-site.xml is searched in the classpath.

sql-gateway.endpoint.hiveserver2.catalog.name

hive (No)

hive catalog name

sql-gateway.endpoint.hiveserver2.catalog.default-database

(none) (No)

The default database to use when the catalog is set as the current catalog.

sql-gateway.endpoint.hiveserver2.thrift.port


8084(No)

The port of the HiveServer2 endpoint

sql-gateway.endpoint.hiveserver2.thrift.worker.threads.max

5(No)

HiveServer2 uses TThreadPoolServer that uses the ThreadPool inside.


sql-gateway.endpoint.hiveserver2.thrift.worker.threads.min

512(No)

sql-gateway.endpoint.hiveserver2.thrift.worker.keepalive-time

60 s(No)

sql-gateway.endpoint.hiveserver2.transport.mode

binary/http

Currently only supports binary mode.

Merge HiveServer2 Endpoint into Hive Connector Module

In this way, every flink-sql-connector-hive-xxx package will contains a specific version HiveServer2 endpoint.

Usage

Switch to the Streaming Mode

SQL Gateway with HiveServer2 endpoint uses the BATCH mode with hive dialect by default. It doesn't limit the user to switch to the default dialect or streaming mode. But please be careful to use the Hive Dialect in the streaming mode because we still don't get the promise that the hive dialect works well in the streaming mode. It may get problems for the corner case. 

Please also changes the 'table. dml-sync' to false when using the hiveserver2 endpoint with default dialect in streaming mode. Because in the streaming mode the pipeline will never end.

How to use

Users can just add the configuration in the flink-conf.yaml. 

Code Block
languageyml
sql-gateway.endpoint.type: hiveserver2
sql-gateway.endpoint.hiveserver2.port: 9002
sql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir: /path/to/catalog

Then just use the command to start the SQL Gateway.

Code Block
languagebash
./sql-gateway.sh start

Please make sure the endpoint package is in the lib directory or the classpath.

We don't suggest the users use the HiveServer2 endpoint with a different version of the Hive Catalog. 

For beeline users, they can use the command to connect to the Sql Gateway.

Code Block
languagebash
beeline> !connect jdbc:hive2://<host>:<port>/<db>;auth=noSasl hiveuser pass 

Ecosystem

Hive JDBC Driver

Many tools use the HiveJdbcDriver to connect to the HiveServer2. Considering the driver uses the thrift client to communicate with, the current HiveServer2 endpoint has the same API. Therefore, in most cases, the Hive Jdbc Driver can directly connect to the HiveServer2 endpoint.  However, the Hive Jdbc driver supports using the following URL to open the Session in the HiveServer2.

Code Block
jdbc:hive2://<host1>:<port1>/dbName;sess_var_list?hive_conf_list#hive_var_list
  • The session_var_list means the properties in the Session, e.g. The file to init the Session or Jdbc connection parametres or authentication.
  • hive_conf_list: set the hive propreties into HiveConf, which works like Configuration in Flink and influences the plan or execution.
  • hive_var_list: set the hive properties into HiveConf but can do the variable subsitutation.

For the difference between the hive_conf and hive_var please refer to the link.

In some cases, the user can also modify the Hive Metastore URLs by using the JDBC URL. e.g.

Code Block
jdbc:hive2://<host1>:<port1>/dbName;#hive.metastore.uris=thrift://localhost:9083

The config will be used to init the Session Configuration and HiveCatalog.

Future Work

  • Support authentication in the Gateway side, including Kerberos, SSL, and Delegation Token. 
  • More transport mode, e.g. HTTP/HTTPS


Implementation

HiveServer2 uses the Apache Thrift framework. The server is composed of 3 parts.

Image Added

  • Transport determines how the client communicates with the server. Now, HiveServer2 supports HTTP and binary mode.
  • Protocol is responsible for serialization and deserialization. Currently, HiveServer2 uses the TBinaryProtocol.
  • Processor is the application logic to handle requests. We should rewrite the Processor with the Flink logic.

Appendix

HiveServer2 API Implementation Details

Session API

  • OpenSession
    • Usage: register a Session in the SessionManager
    • Return: SessionHandle
    • HiveServer2 Endpoint needs to do:
      • Determine the communication version between HiveServer2 and Client;
      • Invoke GatewayService#openSession to register the Session;
      • Configure the Hive Environment
        • Create the Hive Catalog
        • Switch to the Hive Dialect
        • Load the Hive Module
  • CloseSession
    • Usage: Clear the related resources;
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#closeSession

Operation API

  • GetOperationStatus
    • Usage: Get the Operation status.
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#getOperationStatus
  • CancelOperation
    • Usage: Cancel the Operation.
    • HiveServer2 Endpoint needs to do:
      • Invokes the GatewayService#cancelOperation
  • CloseOperation
    • Usage: Close the Operation
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#closeOperation

Statement API

  • ExecuteStatement
    • Usage: Execute the SQL to the Gateway Service synchronously or asynchronously.
    • Return: OperationHandle
    • HiveServer2 Endpoint needs to do:
      • Set table.DML-sync is true because the hive Operation's lifecycle contains the job execution;
      • Invokes the GatewayService#executeStatement
      • HiveServer2 supports executing statements in synchronous mode.
        • Currently the GatewayService only supports submission in asynchronous mode. It requires the HiveServer2 Endpoint to monitor Operation status.
  • FetchResults
    • Usage: Supports fetching the results or logs with fetch orientation to the client.
    • We only support fetching backward once because the GatewayService releases the fetched rows.
    • HiveServer2 Endpoint needs to do:
      • Invoke the GatewayService#fetchResults or GatewayService#fetchLog
  • GetResultSetMetadata
    • Usage: return the result schema
    • HiveServer2 Endpoint needs to do:
      • Invokes the GatewayService#getResultSchema
  • GetInfo
    • Get cluster-info。
      • Only support to get the CLI_SERVER_NAME(FLINK) now. Extend other values if needed in the future.
  • GetTypeInfo
    • Get the ODBC's type info.
  • GetCatalogs
    • Return the registered Catalogs.
    • Do as follows:
Code Block
languagejava
/**
 * The schema for the Operation is 
 * <pre>
 * +-------------
Code Block
languagejava
/**
 * The schema for the Operation is 
 * <pre>
 * +-------------------------------+--------------------------------+
 * | column name    | column type  | comments                       |
 * +-------------------------------+--------------------------------+
 * | TABLE_CAT      | STRING       | catalog name                   |
 * +-------------------------------+------------------+--------------+
 * | TABLE_SCHEMAcolumn name  | STRING | column   type  | schemacomments name    |
                |
 * +* +-------------------------------+--------------------------------+
 * | TABLE_NAMECAT      | STRING       | tablecatalog name                     |
 * +-------------------------------+--------------------------------+
 * | TABLE_TYPE</pre>
 */
gateway.submitOperation(
    HiveServer2OperationType.GET_CATALOGS,
 | STRING  () -> convertToGetCatalogs(gateway.listCatalogs()), 
  | table type, e.g. TABLE, VIEW   | resultSchema);


  • GetSchemas
    • Return the databases info. Currently HiveServer2 supports to use regex to filter out the unmatched database.
    • Do as follow:
Code Block
languagejava
/**
 * The schema for the Operation is 
 * <pre>
 * +-------------------------------+--------------------------------+
+
 * | REMARKScolumn name       | STRINGcolumn    type   | tablecomments desc                     |
 * +-------------------------------+--------------+
 * | TABLE_SCHEMA   | STRING       | schema name  |
 * +---------------------+
 * </pre>
 */
gateway.submitOperation(
    HiveServer2OperationType.GET_TABLES,
----------+--------------+
 * | TABLE_CAT    () -> {
| STRING       List<CatalogTable>| resultscatalog = new ArrayList<>(); name |
        List<String> catalogs = filter(gateway.listCatalogs(sessionHandle), catalogPattern);* +-------------------------------+--------------+
 * </pre>
 */
gateway.submitOperation(
      HiveServer2OperationType.GET_SCHEMAS,
   for (String catalog: catalogs)) -> {
             List<String> databases = filter(gateway.listDatabases(sessionHandle, catalog), databasePattern);
             for (String database: databases) {return convertToGetDatabasesResultSet(databases);
    }, 
    resultSchema);


  • GetTables
    • Get the tables in the specified Catalog and Database. HiveServer2 allows using the Catalog/Database/Table Pattern to filter out the unmatched tables.
    • Do as follow:
Code Block
languagejava
/**
 * The schema for the Operation is List<String>
 tables* = filter(gateway.listTables(sessionHandle, catalog, database), tablePattern);
                 for (String table: tables) {
<pre>
 * +-------------------------------+--------------------------------+
 * | column name    | column type  | comments                       |
      results.add(gateway.getTable(catalog, database, table, ALL));
    * +-------------------------------+--------------------------------+
 * | TABLE_CAT      | STRING       | catalog name        }
           |
  }
         }
* +-------------------------------+--------------------------------+
 * | TABLE_SCHEMA   | STRING       | schema name  return convertToGetTablesResultSet(results);
    }, 
    resultSchema);
  • GetTableTypes
    • Return the table types in the current catalog and current database.
    • Do as follow.
Code Block
languagejava
/**
 * The schema for the Operation is 
 * <pre>|
 * +-------------------------------+--------------------------------+
 * | columnTABLE_NAME name    | columnSTRING     type  | commentstable  name                     |
 * +-------------------------------+--------------------------------+
 * | TABLE_TYPE     | STRING       | table type, e.g. TABLE, VIEW   |
 * +-------------------------------+--------------------------------+
 * | REMARKS        | STRING       | table desc                     |
 * +-------------------------------+--------------------------------+
 * </pre>
 */
gateway.submitOperatonsubmitOperation(
    HiveServer2OperationType.GET_TABLE_TYPESTABLES,
    () -> {
        StringList<CatalogTable> catalogresults = gateway.getCurrentCatalognew ArrayList<>(); 
        StringList<String> databasecatalogs = filter(gateway.getCurrentDatbaselistCatalogs(sessionHandle), catalogPattern);
         for (String catalog: catalogs) {
          List<TableDescriptor> tables   List<String> databases = filter(gateway.listTableslistDatabases(catalogsessionHandle, databasecatalog), ALLdatabasePattern);
        return convertToGetTablesResultSet(results);
     for (String database: databases) {
                 }
)
  • GetColumns
    • Return the column info for the specified tables
    • Should be similar to the GatTables
  • GetPrimaryKeys
    • Return the PK infos
    • Should be similar to the GatTables
  • GetFunctions
    • Return the registered function infos
    • Should be similar to the GatTables

Unsupported API

  • GetCrossReference

Flink doesn't have the concepts about cross reference.

  • GetDelegationToken、CancelDelegationToken、RenewDelegationToken

Flink doesn't support to get the delegation token from the Yarn side now. 

Public Interfaces

GatewayService API Change

Code Block
languagejava
public interface GatewayService List<String> tables = filter(gateway.listTables(sessionHandle, catalog, database), tablePattern);
                 for (String table: tables) {
                     results.add(gateway.getTable(catalog, database, table, ALL));
    /**
     * Fetch the Operation-level log from the GatewayService.
     */
    ResultSet fetchLog(
                 }
             }
         }
         return convertToGetTablesResultSet(results);
    }, 
    resultSchema);


  • GetTableTypes
    • Return the table types in the current catalog and current database.
    • Do as follow.
Code Block
languagejava
/**
 * The schema for the Operation is 
 * <pre>
 * +-------------------------------+--------------------------------+
 * | column name    | column type  | comments                       |
 * +-------------------------------+--------------------------------+
 * | TABLE_TYPE     | STRING       | table type, e.g. TABLE, VIEW   |
 * +-------------------------------+--------------------------------+
 * | REMARKS        | STRING       | table desc                     |
 * +-------------------------------+--------------------------------+
 * </pre>
 */
gateway.submitOperaton(
    HiveServer2OperationType.GET_TABLE_TYPES,
    () -> {
        String catalog = gateway.getCurrentCatalog();
        String database = gateway.getCurrentDatbase();
        
        List<TableDescriptor> tables = gateway.listTables(catalog, database, ALL);
        return convertToGetTablesResultSet(results);
    }
)


  • GetColumns
    • Return the column info for the specified tables
    • Should be similar to the GatTables
Code Block
languagejava
/**
 * The schema for the Operation is 
 * <pre>  
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                         column name |                         column type |                            comments |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                           TABLE_CAT |                              STRING |                        Catalog name |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                        TABLE_SCHEMA |                              STRING |                         Schema name |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                          TABLE_NAME |                              STRING |                          Table name |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                         COLUMN_NAME |                              STRING |                         Column name |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                           DATA_TYPE |                                 INT |        SQL type from java.sql.Types |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                           TYPE_NAME |                              STRING |     Data source dependent type name |
 * +-------------------------------------+-------------------------------------+-------------------------------------+ 
 * |                         COLUMN_SIZE |                                 INT |                         Column size |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                       BUFFER_LENGTH |                             TINYINT |                              Unused |
 * +-------------------------------------+-------------------------------------+-------------------------------------+ 
 * |                      DECIMAL_DIGITS |                                 INT |     The number of fractional digits |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                      NUM_PREC_RADIX |                                 INT |    Radix (typically either 10 or 2) |
 * +-------------------------------------+-------------------------------------+-------------------------------------+ 
 * |                            NULLABLE |                                 INT |                     Is NULL allowed |
 * +-------------------------------------+-------------------------------------+-------------------------------------+ 
 * |                             REMARKS |                              STRING |           Comment describing column |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                          COLUMN_DEF |                              STRING |         Default value (may be null) |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                       SQL_DATA_TYPE |                                 INT |                              Unused |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                    SQL_DATETIME_SUB |                                 INT |                              Unused |
 * +-------------------------------------+-------------------------------------+-------------------------------------+ 
 * |                   CHAR_OCTET_LENGTH |                                 INT |                              Length |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                    ORDINAL_POSITION |                                 INT |            Index of column in table |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                         IS_NULLABLE |                              STRING |                           NO or YES |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                       SCOPE_CATALOG |                              STRING |           null because no reference |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                        SCOPE_SCHEMA |                              STRING |           null because no reference |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                         SCOPE_TABLE |                              STRING |           null because no reference |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                    SOURCE_DATA_TYPE |                            SMALLINT |                                     |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                   IS_AUTO_INCREMENT |                              STRING |                                     |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * </pre>
 */


  • GetPrimaryKeys
    • Return the PK Infos
    • Should be similar to the GatTables
Code Block
languagejava
/**
 * The schema for the Operation is 
 * <pre>  
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                         column name |                         column type |                            comments |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                           TABLE_CAT |                              STRING |                        Catalog name |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                        TABLE_SCHEMA |                              STRING |                         Schema name |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                          TABLE_NAME |                              STRING |                          Table name |
 * +-------------------------------------+-------------------------------------+-------------------------------------+ 
 * |                             KEY_SEQ |                                 INT |  Sequence number within primary key |
 * +-------------------------------------+-------------------------------------+-------------------------------------+ 
 * |                             PK_NAME |                              STRING |      Primary key name (may be null) |
 * +-------------------------------------+-------------------------------------+-------------------------------------+ 
 */
  • GetFunctions
    • Return the registered function Infos
    • Should be similar to the GatTables
Code Block
languagejava
/**
 * The schema for the Operation is 
 * <pre>  
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                         column name |                         column type |                            comments |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                        FUNCTION_CAT |                              STRING |                        Catalog name |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                     FUNCITON_SCHEMA |                              STRING |                         Schema name |
 * +-------------------------------------+-------------------------------------+-------------------------------------+
 * |                       FUNCTION_NAME |                              STRING |                       Function name |
 * +-------------------------------------+-------------------------------------+-------------------------------------+ 
 * |                             REMARKS |                              STRING | Explanatory comment on the function |
 * +-------------------------------------+-------------------------------------+-------------------------------------+ 
 * |                       FUNCTION_TYPE |           SessionHandle sessionHandle, 
        OperationHandle operationHandle, 
       STRING FetchOrientation| orientation,  
        int maxRows);
    
   /**
    * Only supports FORWARD/BACKWARD. 
    * - Users can only BACKWARD from the current offset once.
    * - The Gateway don't not materialize the changelog.
    */
    ResultSet fetchResult(
   				Kind of function |
 * +-------------------------------------+-------------------------------------+-------------------------------------+ 
 * | 	                 SessionHandle sessionHandle, 
SPECIFIC_NAME |       OperationHandle operationHandle, 
        FetchOrientation orientation, 
        int maxRows);
}

enum FetchOrientation {
STRING |    FORWARD,
The name identifies this BACKWARD
}
 

Options

We use the same style as the HiveServer2 options。

...

Option name

...

Default value(Required)

...

Description

...

hiveserver2.catalog.hive-conf-dir

...

(none)(Yes)

...

URI to your Hive conf dir containing hive-site.xml. The URI needs to be supported by Hadoop FileSystem. If the URI is relative, i.e. without a scheme, local file system is assumed. If the option is not specified, hive-site.xml is searched in class path.

...

hiveserver2.catalog.name

...

hive (no)

...

hive catalog name

...

hiveserver2.catalog.default-database

...

(none) (Yes)

...

The default database to use when the catalog is set as the current catalog.

hiveserver2.thrift.bind-port

...

8084(No)

...

The port of the HiveServer2 endpoint

...

hiveserver2.thrift.worker.min-threads

...

5(No)

HiveServer2 uses TThreadPoolServer, which use the ThreadPool inside.

...

hiveserver2.thrift.worker.max-threads

...

512(No)

...

hiveserver2.thrift.worker.alive-duration

...

60 s(No)

...

hiveserver2.transport.mode

...

binary/http (tcp)

...

Currently only supports binary mode.

Usage Example

Code Block
languageyml
endpoint.protocol: hiveserver2
endpoint.hiveserver2.port: 9002
endpoint.hiveserver2.catalog.hive-conf-dir: /path/to/catalog

Implementation

HiveServer2 uses the Apache Thrift framework. The server is composed of 3 parts.

Image Removed

  • Transport determines how the client communicates with the server。Now HiveServer2 supports http and binary mode.
  • Protocol is responsible for serialization and deserialization。Currently HiveServer2 uses the TBinaryProtocol.
  • Processor is the application logic to handle requests 。We should rewrite the Processor with the Flink logic.

...

function|
 * +-------------------------------------+-------------------------------------+-------------------------------------+ 
 */