Status
Current state: "Under Discussion"
Discussion thread: https://lists.apache.org/thread/9r1j7ho2m8zbqy3tl7vvj9gnocggwr6x
JIRA:
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In the batch world, Hive is still the standard. Considering that Flink has continuously improved its compatibility with Hive in recent years, we propose to expose HiveServer2 Endpoint in the Flink SQL Gateway. The endpoint will implement the thrift interface exposed by HiveServer2, and users' BI, CLI and other tools based on the HiveServer2 can also be seamlessly migrated to the Flink SQL Gateway.
HiveServer2 has 3 kinds of functionality:
- manage the metadata info in the Hive MetaStore
- translate the HiveSQL to the Operation and execute:
- translate DML/QUERY to MapReduce job and submit the job to the Hadoop
- translate to the Operation that manipulates the Hive MetaStore
- Users can lookup the Operation status and manage the submitted Operation.
After the FLIP finishes, the users can do all the functionality above in the Flink SQL Gateway with HiveServer2 endpoint.
Overall Design
HiveServer2 has the similar architecture we describe in the FLIP-91. User should register the Session
in the SessionManager
before submitting the statement. After that, the user uses the SessionHandle
to find the registered resources in the Session
and execute the statements. All statements will be translated to the Operation
and use the OperationHandle
to fetch the results or logs. Therefore, the HiveServer2 endpoint gurantees:
- The
SessionHandle
andOperationHandle
in the HiveServer2 is able to convert to theSessionHandle
andOperationHandle
in the FLIP-91; - The interfaces exposed by the HiveServer2 can be converted into the calls in
GatewayService
;
Handle Mapping
The structure of the Handle in the HiveServer2 is as follows.
The transfomration is as follows.
- HiveServer2
SessionHandle
has the same structure with the GatewaySessionHandle
- The HiveServer2
OperationHandle
has more information than theOperationHandle
in the Gateway. HiveServer2 endpoint should use the result of theGatewayService#getOperationInfo
to build theOperationHandle
in the HiveServer2.
HiveServer2 Endpoint API
Refer to the file for the HiveServer2 API.
Session API
- OpenSession
- Usage:register a
Session
in theSessionManager
- Return:
SessionHandle
- HiveServer2 Endpoint needs to do:
- Determine the communication version between HiveServer2 and Client;
- Invoke
GatewayService#openSession
to register theSession
; - Configure the Hive Environment
- Create the Hive Catalog
- Switch to the Hive Dialect
- Load the Hive Module
- CloseSession
- Usage: Clear the related resources;
- HiveServer2 Endpoint needs to do:
- Invoke the
GatewayService#closeSession
Operation API
- GetOperationStatus
- Usage: Get the Operation status.
- HiveServer2 Endpoint needs to do:
- Invoke the
GatewayService#getOperationStatus
- CancelOperation
- Usage: Cancel the Operation.
- HiveServer2 Endpoint needs to do:
- Invokes the
GatewayService#cancelOperation
- CloseOperation
- Usage: Close the Operation
- HiveServer2 Endpoint needs to do:
- Invoke the
GatewayService#closeOperation
Statement API
- ExecuteStatement
- Usage: Execute the SQL to the Gateway Service synchronously or asynchronously.
- Return:
OperationHandle
- HiveServer2 Endpoint needs to do:
- Set
table.dml-sync
true because hive Operation's lifecycle contains the job execution;
- Set
- Invokes the
GatewayService#executeStatement
- HiveServer2 supports to execute statement in the synchronous mode.
- Currently the
GatewayService
only supports submission in asynchronous mode. It requires the HiveServer2 Endpoint to monitorOperation
status.
- Currently the
- FetchResults
- Usage: Supports fetching the results or logs with fetch orientation to the client.
- We only supports fetch backward once because the GatewayService releases the fetched rows.
- HiveServer2 Endpoint needs to do:
- Invoke the
GatewayService#fetchResults
orGatewayService#fetchLog
- GetResultSetMetadata
- Usage: return the result schema
- HiveServer2 Endpoint needs to do:
- Invokes the
GatewayService#getResultSchema
。
- GetInfo
- Get cluster info。
- Only support to get the CLI_SERVER_NAME(FLINK) now. Extend other values if needed in the future.
- GetTypeInfo
- Get the ODBC's type info.
- GetCatalogs
- Return the registered Catalogs.
- Do as follows:
/** * The schema for the Operation is * <pre> * +-------------------------------+--------------+ * | column name | column type | comments | * +-------------------------------+--------------+ * | TABLE_CAT | STRING | catalog name | * +-------------------------------+--------------+ * </pre> */ gateway.submitOperation( HiveServer2OperationType.GET_CATALOGS, () -> convertToGetCatalogs(gateway.listCatalogs()), resultSchema);
- GetSchemas
- Return the databases info. Currently HiveServer2 supports to use regex to filter out the unmatched database.
- Do as follow:
/** * The schema for the Operation is * <pre> * +-------------------------------+--------------+ * | column name | column type | comments | * +-------------------------------+--------------+ * | TABLE_SCHEMA | STRING | schema name | * +-------------------------------+--------------+ * | TABLE_CAT | STRING | catalog name | * +-------------------------------+--------------+ * </pre> */ gateway.submitOperation( HiveServer2OperationType.GET_SCHEMAS, () -> { List<String> databases = filter(gateway.listDatabases(sessionHandle), databasePattern); return convertToGetDatabasesResultSet(databases); }, resultSchema);
- GetTables
- Get the tables in the specified Catalog and Database. HiveServer2 allows to use the Catalog/Database/Table Pattern to filter out the unmatched tables.
- Do as follow:
/** * The schema for the Operation is * <pre> * +-------------------------------+--------------------------------+ * | column name | column type | comments | * +-------------------------------+--------------------------------+ * | TABLE_CAT | STRING | catalog name | * +-------------------------------+--------------------------------+ * | TABLE_SCHEMA | STRING | schema name | * +-------------------------------+--------------------------------+ * | TABLE_NAME | STRING | table name | * +-------------------------------+--------------------------------+ * | TABLE_TYPE | STRING | table type, e.g. TABLE, VIEW | * +-------------------------------+--------------------------------+ * | REMARKS | STRING | table desc | * +-------------------------------+--------------------------------+ * </pre> */ gateway.submitOperation( HiveServer2OperationType.GET_TABLES, () -> { List<CatalogTable> results = new ArrayList<>(); List<String> catalogs = filter(gateway.listCatalogs(sessionHandle), catalogPattern); for (String catalog: catalogs) { List<String> databases = filter(gateway.listDatabases(sessionHandle, catalog), databasePattern); for (String database: databases) { List<String> tables = filter(gateway.listTables(sessionHandle, catalog, database), tablePattern); for (String table: tables) { results.add(gateway.getTable(catalog, database, table, ALL)); } } } return convertToGetTablesResultSet(results); }, resultSchema);
- GetTableTypes
- Return the table types in the current catalog and current database.
- Do as follow.
/** * The schema for the Operation is * <pre> * +-------------------------------+--------------------------------+ * | column name | column type | comments | * +-------------------------------+--------------------------------+ * | TABLE_TYPE | STRING | table type, e.g. TABLE, VIEW | * +-------------------------------+--------------------------------+ * | REMARKS | STRING | table desc | * +-------------------------------+--------------------------------+ * </pre> */ gateway.submitOperaton( HiveServer2OperationType.GET_TABLE_TYPES, () -> { String catalog = gateway.getCurrentCatalog(); String database = gateway.getCurrentDatbase(); List<TableDescriptor> tables = gateway.listTables(catalog, database, ALL); return convertToGetTablesResultSet(results); } )
- GetColumns
- Return the column info for the specified tables
- Should be similar to the GatTables
/** * The schema for the Operation is * <pre> * +-------------------------------------+-------------------------------------+-------------------------------------+ * | column name | column type | comments | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | TABLE_CAT | STRING | Catalog name | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | TABLE_SCHEMA | STRING | Schema name | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | TABLE_NAME | STRING | Table name | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | COLUMN_NAME | STRING | Column name | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | DATA_TYPE | INT | SQL type from java.sql.Types | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | TYPE_NAME | STRING | Data source dependent type name | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | COLUMN_SIZE | INT | Column size | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | BUFFER_LENGTH | TINYINT | Unused | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | DECIMAL_DIGITS | INT | The number of fractional digits | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | NUM_PREC_RADIX | INT | Radix (typically either 10 or 2) | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | NULLABLE | INT | Is NULL allowed | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | REMARKS | STRING | Comment describing column | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | COLUMN_DEF | STRING | Default value (may be null) | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | SQL_DATA_TYPE | INT | Unused | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | SQL_DATETIME_SUB | INT | Unused | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | CHAR_OCTET_LENGTH | INT | Length | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | ORDINAL_POSITION | INT | Index of column in table | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | IS_NULLABLE | STRING | NO or YES | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | SCOPE_CATALOG | STRING | null because no reference | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | SCOPE_SCHEMA | STRING | null because no reference | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | SCOPE_TABLE | STRING | null because no reference | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | SOURCE_DATA_TYPE | SMALLINT | | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | IS_AUTO_INCREMENT | STRING | | * +-------------------------------------+-------------------------------------+-------------------------------------+ * </pre> */
- GetPrimaryKeys
- Return the PK infos
- Should be similar to the GatTables
/** * The schema for the Operation is * <pre> * +-------------------------------------+-------------------------------------+-------------------------------------+ * | column name | column type | comments | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | TABLE_CAT | STRING | Catalog name | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | TABLE_SCHEMA | STRING | Schema name | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | TABLE_NAME | STRING | Table name | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | KEY_SEQ | INT | Sequence number within primary key | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | PK_NAME | STRING | Primary key name (may be null) | * +-------------------------------------+-------------------------------------+-------------------------------------+ */
- GetFunctions
- Return the registered function infos
- Should be similar to the GatTables
/** * The schema for the Operation is * <pre> * +-------------------------------------+-------------------------------------+-------------------------------------+ * | column name | column type | comments | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | FUNCTION_CAT | STRING | Catalog name | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | FUNCITON_SCHEMA | STRING | Schema name | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | FUNCTION_NAME | STRING | Function name | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | REMARKS | STRING | Explanatory comment on the function | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | FUNCTION_TYPE | STRING | Kind of function | * +-------------------------------------+-------------------------------------+-------------------------------------+ * | SPECIFIC_NAME | STRING | The name identifies this function| * +-------------------------------------+-------------------------------------+-------------------------------------+ */
Difference with HiveServer2
- GetCrossReference
Flink doesn't have the concepts of the cross-reference.
- FetchResults
Currently, the SqlGatewayService only supports fetching forward and can not fetch backward multiple times but the HiveServer2 doesn't have the limitation.
- Infomation loss between the conversion
Currently, the Gateway uses the HiveCatalog to get metadata from the metastore. It means we need to convert the results in the Hive dialect to the Flink dialect in the Catalog. When the endpoint sends the results to the users, it needs to translate the results in Flink dialect to Hive dialect. In some cases, it may lose some info. For example, Hive has 5 kinds TableType: MANAGED_TABLE, EXTERNAL_TABLE, VIRTUAL_TABLE, INDEX_TABLE, and MATERIALIZED_VIEW. But Flink only supports TABLE/VIEW. It means users can't get the specific table type with the HiveServer2 endpoint.
HiveServer2 Compatibility
Compatibility with Client
HiveServer2 uses the version to solve the compatibility between the client and server, which means hiveserver2 can communicate with the client in any version. The logic behind this is the
- During the OpenSession, the client and server determine the protocol version.
- Client promises:
- client should not send the request which is exposed in the higher version
- client should not send the request that contains the parameter that is exposed in the higher version
- client should send the request with the parameters that are specified in the determined protocol version.
- Server promises:
- return the result with the specified schema in the determined protocol version
If users want to use the HiveServer2 with a higher version, users can just upgrade the HiveServer2 endpoint's version(dependency version) and don't need to implement another HiveServer2 endpoint with the expected version. If users want to use the HiveServer2 in the lower version, the current endpoint should satisfy the requirements.
Compatibility with Hive metastore
During the setup, the HiveServer2 tries to load the config in the hive-site.xml to initialize the Hive metastore client. Therefore, in the Flink, we use the Catalog interface to connect to the Hive Metastore. The hiveserver2 endpoint requires the user to specify the path of the hive-site.xml as the endpoint parameters, which will be used to create the default HiveCatalog. Considering the different users may have different requirements to connect to different meta stores, they can use the DDL to register the HiveCatalog that satisfies their requirements.
The HiveServer2 endpoint binds to the specific version of the Hive Catalog. Using the HiveServer2 endpoint with a different version of the Hive Catalog is not suggested.
Public Interfaces
Options
We use the same style as the HiveServer2 options。
Option name | Default value(Required) | Description |
sql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir | (none)(Yes) | URI to your Hive conf dir containing hive-site.xml. The URI needs to be supported by Hadoop FileSystem. If the URI is relative, i.e. without a scheme, the local file system is assumed. If the option is not specified, hive-site.xml is searched in the classpath. |
sql-gateway.endpoint.hiveserver2.catalog.name | hive (No) | hive catalog name |
sql-gateway.endpoint.hiveserver2.catalog.default-database | (none) (No) | The default database to use when the catalog is set as the current catalog. |
sql-gateway.endpoint.hiveserver2.thrift.port | 8084(No) | The port of the HiveServer2 endpoint |
sql-gateway.endpoint.hiveserver2.thrift.worker.threads.max | 5(No) | HiveServer2 uses |
sql-gateway.endpoint.hiveserver2.thrift.worker.threads.min | 512(No) | |
sql-gateway.endpoint.hiveserver2.thrift.worker.keepalive-time | 60 s(No) | |
sql-gateway.endpoint.hiveserver2.transport.mode | binary/http | Currently only supports binary mode. |
How To Use
User can just add the configuration in the flink-conf.yaml.
endpoint.protocol: hiveserver2 endpoint.hiveserver2.port: 9002 endpoint.hiveserver2.catalog.hive-conf-dir: /path/to/catalog
Then just use the command to start the SQL Gateway.
./sql-gateway.sh
*Please make sure the endpoint package is in the lib directory or in the classpath.*
Ecosystem
Hive JDBC Driver
Many tools use the HiveJdbcDriver to connect to the HiveServer2. Considering the driver uses the thrift client to communicate with, the current HiveServer2 endpoint has the same API. Therefore, in most cases, the Hive Jdbc Driver can connect to the HiveServer2 endpoint directly. However, the Hive Jdbc driver supports using the following URL to open the Session in the HiveServer2.
jdbc:hive2://<host1>:<port1>/dbName;sess_var_list?hive_conf_list#hive_var_list
- The session_var_list means the properties in the
Session, e.g. The file to init the Session or Jdbc connection parametres or authentication.
- hive_conf_list: set the hive propreties into HiveConf, which works like Configuration in Flink and influences the plan or execution.
- hive_var_list: set the hive properties into HiveConf but can do the variable subsitutation.
For the difference between the hive_conf and hive_var please refer to the link.
In some cases, the user can also modify the Hive Metastore URLs by using the JDBC URL. e.g.
jdbc:hive2://<host1>:<port1>/dbName;#hive.metastore.uris=thrift://localhost:9083
It requires the HiveServer2 endpoint can recognize the Hive Catalog-related options and use these configs to create the expected Hive Catalog. When using the JDBC driver to connect to the HiveSever2, users can use the following URLs to change the metastore address.
jdbc:hive2://<host1>:<port1>/dbName;#hive-conf-dir=/path/to/conf
Future Work
- Support authentication in the Gateway side, including Kerberos, SSL, and Delegation Token.
- More transport mode, e.g. HTTP/HTTPS
Implementation
HiveServer2 uses the Apache Thrift framework. The server is composed of 3 parts.
- Transport determines how the client communicates with the server. Now, HiveServer2 supports HTTP and binary mode.
- Protocol is responsible for serialization and deserialization. Currently, HiveServer2 uses the TBinaryProtocol.
- Processor is the application logic to handle requests. We should rewrite the Processor with the Flink logic.
Therefore, we rely on the hive-service-RPC which contains HiveServer2 API and thrift dependencies. Considering the HiveServer2 endpoint is lightweight and needs to work with the Hive Catalog, we just merge the HiveServer2 Endpoint into the flink-connector-hive module.