You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current stateunder discussion

Discussion thread:

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-152-Hive-Query-Syntax-Compatibility-td46928.html

Motivation

FLIP-123 has implemented HiveQL-compatible DDLs so that users can process metadata in HiveQL. This FLIP aims to provide syntax compatibility for queries. Similar as FLIP-123, this FLIP will improve interoperability with Hive and reduce migration efforts. Besides, this FLIP also makes it possible to extend HiveQL to support streaming features. And therefore, users may not only write HiveQL for batch jobs, they may also run streaming jobs in a HiveQL-fashion, which provides better batch-streaming unified experience for a migrating user.

Proposed Changes

The Idea

One thing that Flink and Hive have in common is that they both generate a Calcite plan (in form of a RelNode) during SQL query processing. While Flink uses this RelNode as its logical plan, Hive only uses it for optimization and later converts it to its own form of logical plan. The inputs to generate the plan are also different. For Flink, the input is a SqlNode parsed by Calcite, while for Hive, the input is an ASTNode parsed by Antlr.

The overall idea to support Hive query syntax, is to reuse Hive’s Antlr parser to generate the ASTNode, and then adapt Hive’s RelNode-generation process to generate a RelNode that can be used in Flink. Once we have the RelNode, we can simply follow the current processing path in Flink to optimize and finally execute the plan.

Introduce HiveParser

In order to process a SQL statement differently, we introduce HiveParser as a sub-class of ParserImpl and overrides the parse method. This is the entry point of this feature.

We will leverage Hive’s Antlr parser to generate the ASTNode, and Hive’s semantic analyzer to collect semantic info of the SQL statement. Based on these information, we can tell which kind the statement is and process it accordingly.

DDL

We have two options to process DDLs.

  1. Just delegate DDLs to super class and reuse FLIP-123 to process them.
  2. Process DDLs using the semantics we get from Hive.

For simplicity, we choose option #1.

DQL

Use HiveParserCalcitePlanner to generate the RelNode and create a PlannerQueryOperation with it.

DML

Since we don’t support Hive ACID tables, we’ll only consider INSERT here.

Use HiveParserCalcitePlanner to generate the RelNode for the query and create a CatalogSinkModifyOperation. To do that, HiveParser needs to:

  1. Figure out static partition specs for partitioned table, and add static partition values to Project in the RelNode
  2. Figure out dest schema, if any, and adjust the Project fields in the RelNode
  3. Figure out whether to overwrite existing data

Hive Dialect & Pluggable Parser

FLIP-123 has made SQL dialect as the switch to turn on/off Hive compatibility. In this FLIP, we’ll continue to use it for queries.

Since we have multiple implementations of org.apache.flink.table.delegation.Parser now, we need to make it pluggable so that the planner knows which one to use. We’ll introduce BlinkParserFactory to create Parser instances and use SPI to find the factory to use according to the current dialect.

BlinkParserFactory definition:

public interface BlinkParserFactory extends ComponentFactory {

	Parser create(CatalogManager catalogManager, PlannerContext plannerContext);
}

The factory to create ParserImpl:

public class ParserImplFactory implements BlinkParserFactory {

	@Override
	public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) {
		SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter;
		return new ParserImpl(
				catalogManager,
				() -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()),
				plannerContext::createCalciteParser,
				tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)));
	}

	@Override
	public Map<String, String> optionalContext() {
		DescriptorProperties properties = new DescriptorProperties();
		return properties.asMap();
	}

	@Override
	public Map<String, String> requiredContext() {
		DescriptorProperties properties = new DescriptorProperties();
		properties.putString(TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.DEFAULT.name().toLowerCase());
		return properties.asMap();
	}

	@Override
	public List<String> supportedProperties() {
		return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key());
	}
}

The factory to create HiveParser:

public class HiveParserFactory implements BlinkParserFactory {

	@Override
	public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) {
		SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter;
		return new HiveParser(
				catalogManager,
				() -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()),
				plannerContext::createCalciteParser,
				tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)),
				plannerContext);
	}

	@Override
	public Map<String, String> optionalContext() {
		DescriptorProperties properties = new DescriptorProperties();
		return properties.asMap();
	}

	@Override
	public Map<String, String> requiredContext() {
		DescriptorProperties properties = new DescriptorProperties();
		properties.putString(TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name().toLowerCase());
		return properties.asMap();
	}

	@Override
	public List<String> supportedProperties() {
		return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key());
	}
}

We need to support switching dialect on a per-statement basis. Therefore blink planner cannot hold a constant Parser instance, but has to create a new instance each time getParser is called:

  override def getParser: Parser = {
    val parserProps = Map(TableConfigOptions.TABLE_SQL_DIALECT.key() ->
      getTableConfig.getSqlDialect.name().toLowerCase)
    ComponentFactoryService.find(classOf[BlinkParserFactory], parserProps)
      .create(catalogManager, plannerContext)
  }

HiveParserCalcitePlanner

This class encapsulates all the logic to generate the RelNode plan for a query. We follow Hive’s CalcitePlanner to do it but adapt to our own needs. For example, Hive has its own RelNode implementations like HiveProject and HiveJoin, which we’ll change to LogicalProject and LogicalJoin respectively. Moreover, Hive’s CalcitePlanner doesn’t support all Hive features, e.g. SORT BY or LATERAL VIEW. We can extend the logic in HiveParserCalcitePlanner and support them.

Support Multiple Hive Versions

In order to support multiple Hive versions, we keep our own copy of Hive code to generate the ASTNode and do semantic analysis. This means that a SQL statement is processed with the same code no matter which Hive version is in use. The rationale behind this decision is:

  1. HiveQL syntax is in general backward compatible. So we can use a newer version to support older versions.
  2. The process to generate RelNode plan is tightly coupled with ASTNode and semantic analysis. While it’s theoretically possible to make HiveParserCalcitePlanner support different versions, that’ll make the logic much more complicated and error-prone.
  3. The copied code gives us more flexibility to support new features in the future. For example, we can adapt the code to support writing HiveQL for generic tables, or support querying tables across multiple catalogs.

New or Changed Public Interfaces

No public interface has to be added or changed.

Compatibility, Deprecation, and Migration Plan

N/A

Test Plan

The best way to verify Hive query compatibility is to run Hive’s QFile test with Flink. But in order to limit our code base size and test time, this should not be included in this FLIP, at least not as a regular UT/IT case.

Rejected Alternatives

Let Hive generate the RelNode and then convert it to meet Flink’s requirements. This will inevitably lead to jar conflicts because Hive depends on a quite old Calcite version while Flink basically upgrades Calcite dependency in each major release. In addition, it’s more difficult to support new features.

Limitations

The following limitations apply when using this feature.

  1. Only works with Hive tables and the current catalog needs to be a HiveCatalog.
  2. Queries cannot involve tables/views from multiple catalogs.
  3. HiveModule should be used in order to use Hive built-in functions.
  4. Some features are not supported due to underlying functionalities are missing. For example, Hive’s UNION type is not supported.

Appendix

The following diagram shows the process how HiveParser parses a SQL statement.

  • No labels