You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current stateunder discussion

Discussion thread:

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-152-Hive-Query-Syntax-Compatibility-td46928.html

Motivation

FLIP-123 has implemented HiveQL-compatible DDLs so that users can process metadata in HiveQL. This FLIP aims to provide syntax compatibility for queries. Similar as FLIP-123, this FLIP will improve interoperability with Hive and reduce migration efforts. Besides, this FLIP also makes it possible to extend HiveQL to support streaming features. And with this FLIP, the following typical use cases can be supported:

  1. Users can migrate their batch Hive jobs to Flink, without needing to modify the SQL scripts.
  2. Users can write HiveQL to integrate streaming features with Hive tables, e.g. streaming data from Kafka to Hive.
  3. Users can write HiveQL to process non-Hive tables, either in batch or in streaming jobs.

For migrating users, we believe it's desirable for them to be able to continue write Hive syntax. It not only makes the migration easier, but also helps them leverage Flink for new scenarios more quickly, and thus provides unified batch-streaming experience.

Proposed Changes

The Idea

One thing that Flink and Hive have in common is that they both generate a Calcite plan (in form of a RelNode) during SQL query processing. While Flink uses this RelNode as its logical plan, Hive only uses it for optimization and later converts it to its own form of logical plan. The inputs to generate the plan are also different. For Flink, the input is a SqlNode parsed by Calcite, while for Hive, the input is an ASTNode parsed by Antlr.

The overall idea to support Hive query syntax, is to reuse Hive’s Antlr parser to generate the ASTNode, and then adapt Hive’s RelNode-generation process to generate a RelNode that can be used in Flink. Once we have the RelNode, we can simply follow the current processing path in Flink to optimize and finally execute the plan.

Introduce HiveParser

In order to process a SQL statement differently, we introduce HiveParser as a sub-class of ParserImpl and overrides the parse method. This is the entry point of this feature.

We will leverage Hive’s Antlr parser to generate the ASTNode, and Hive’s semantic analyzer to collect semantic info of the SQL statement. Based on these information, we can tell which kind the statement is and process it accordingly.

DDL

We have two options to process DDLs.

  1. Just delegate DDLs to super class and reuse FLIP-123 to process them.
  2. Process DDLs using the semantics we get from Hive.

In order to provide consistent user experience, we choose option #2 to handle DDLs.

DQL

Use HiveParserCalcitePlanner to generate the RelNode and create a PlannerQueryOperation with it.

DML

Since we don’t support Hive ACID tables, we’ll only consider INSERT here.

Use HiveParserCalcitePlanner to generate the RelNode for the query and create a CatalogSinkModifyOperation. To do that, HiveParser needs to:

  1. Figure out static partition specs for partitioned table, and add static partition values to Project in the RelNode
  2. Figure out dest schema, if any, and adjust the Project fields in the RelNode
  3. Figure out whether to overwrite existing data

Hive Dialect & Pluggable Parser

FLIP-123 has made SQL dialect as the switch to turn on/off Hive compatibility. In this FLIP, we’ll continue to use it for queries.

Since we have multiple implementations of org.apache.flink.table.delegation.Parser now, we need to make it pluggable so that the planner knows which one to use. We’ll introduce ParserFactory to create Parser instances and use SPI to find the factory to use according to the current dialect.

ParserFactory definition:

public interface ParserFactory extends ComponentFactory {

	Parser create(CatalogManager catalogManager, PlannerContext plannerContext);
}

The factory to create ParserImpl:

public class DefaultParserFactory implements ParserFactory {

	@Override
	public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) {
		SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter;
		return new ParserImpl(
				catalogManager,
				() -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()),
				plannerContext::createCalciteParser,
				tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)));
	}

	@Override
	public Map<String, String> optionalContext() {
		DescriptorProperties properties = new DescriptorProperties();
		return properties.asMap();
	}

	@Override
	public Map<String, String> requiredContext() {
		DescriptorProperties properties = new DescriptorProperties();
		properties.putString(TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.DEFAULT.name().toLowerCase());
		return properties.asMap();
	}

	@Override
	public List<String> supportedProperties() {
		return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key());
	}
}

The factory to create HiveParser:

public class HiveParserFactory implements ParserFactory {

	@Override
	public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) {
		SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter;
		return new HiveParser(
				catalogManager,
				() -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()),
				plannerContext::createCalciteParser,
				tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)),
				plannerContext);
	}

	@Override
	public Map<String, String> optionalContext() {
		DescriptorProperties properties = new DescriptorProperties();
		return properties.asMap();
	}

	@Override
	public Map<String, String> requiredContext() {
		DescriptorProperties properties = new DescriptorProperties();
		properties.putString(TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name().toLowerCase());
		return properties.asMap();
	}

	@Override
	public List<String> supportedProperties() {
		return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key());
	}
}

We need to support switching dialect on a per-statement basis. Therefore blink planner cannot hold a constant Parser instance, but has to create a new instance if dialect has changed. The updated getParser method:

  override def getParser: Parser = {
    if (getTableConfig.getSqlDialect != currentDialect) {
      val parserProps = Map(TableConfigOptions.TABLE_SQL_DIALECT.key() ->
        getTableConfig.getSqlDialect.name().toLowerCase)
      parser = ComponentFactoryService.find(classOf[ParserFactory], parserProps)
        .create(catalogManager, plannerContext)
      currentDialect = getTableConfig.getSqlDialect
    }
    parser
  }

HiveParserCalcitePlanner

This class encapsulates all the logic to generate the RelNode plan for a query. We follow Hive’s CalcitePlanner to do it but adapt to our own needs. For example, Hive has its own RelNode implementations like HiveProject and HiveJoin, which we’ll change to LogicalProject and LogicalJoin respectively. Moreover, Hive’s CalcitePlanner doesn’t support all Hive features, e.g. SORT BY or LATERAL VIEW. We can extend the logic in HiveParserCalcitePlanner and support them.

Support Multiple Hive Versions

In order to support multiple Hive versions, we keep our own copy of Hive code to generate the ASTNode and do semantic analysis. This means that a SQL statement is processed with the same code no matter which Hive version is in use. The rationale behind this decision is:

  1. HiveQL syntax is in general backward compatible. So we can use a newer version to support older versions.
  2. The process to generate RelNode plan is tightly coupled with ASTNode and semantic analysis. While it’s theoretically possible to make HiveParserCalcitePlanner support different versions, that’ll make the logic much more complicated and error-prone.
  3. The copied code gives us more flexibility to support new features in the future. For example, we can adapt the code to support writing HiveQL for generic tables, or support querying tables across multiple catalogs.

Since most Hive users are still using Hive 2.x or 1.x, we'll copy Hive code from 2.x, which would reduce the required efforts to cover these versions.  For new features in Hive 3.x, e.g. table constraints, we'll extend the copied code to support them.

Go Beyond Hive

In order to support the use cases mentioned in the "Motivation" section, we should ultimately make this feature as a pure SQL dialect and decoupled from Hive tables or functions. To achieve that, we need to:

  1. Extend Hive syntax, so that it supports identifiers like "catalog.db.table".
  2. Leverage Catalog to resolve tables or views.
  3. Leverage SqlOperatorTable to resolve functions.
  4. Extend Hive syntax to support streaming features such as Group Windows.

These efforts are within scope of this FLIP but don't need to be implemented in an MVP release.

New or Changed Public Interfaces

No public interface has to be added or changed.

Compatibility, Deprecation, and Migration Plan

N/A

Test Plan

The best way to verify Hive query compatibility is to run Hive’s QFile test with Flink. But in order to limit our code base size and test time, this should not be included in this FLIP, at least not as a regular UT/IT case.

Rejected Alternatives

Let Hive generate the RelNode and then convert it to meet Flink’s requirements. This will inevitably lead to jar conflicts because Hive depends on a quite old Calcite version while Flink basically upgrades Calcite dependency in each major release. In addition, it’s more difficult to support new features.

Limitations

The following limitations apply when using this feature.

  1. HiveModule should be used in order to use Hive built-in functions.
  2. Some features are not supported due to underlying functionalities are missing. For example, Hive’s UNION type is not supported.

Appendix

The following diagram shows the process how HiveParser parses a SQL statement.

  • No labels