Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-21529

Release1.13

Status

Current stateunder discussion

Discussion thread:

...


Motivation

FLIP-123 has implemented HiveQL-compatible DDLs so that users can process metadata in HiveQL. This FLIP aims to provide syntax compatibility for queries. Similar as FLIP-123, this FLIP will improve interoperability with Hive and reduce migration efforts. Besides, this FLIP also makes it possible to extend HiveQL to support streaming features. And with this FLIP, the following typical use cases can be supported:

...

  1. Just delegate DDLs to super class and reuse FLIP-123 to process them.
  2. Process DDLs using the semantics we get from Hive.

For simplicityIn order to provide consistent user experience, we choose option #1#2 to handle DDLs.

DQL

Use HiveParserCalcitePlanner to generate the RelNode and create a PlannerQueryOperation with it.

...

Since we have multiple implementations of org.apache.flink.table.delegation.Parser now, we need to make it pluggable so that the planner knows which one to use. We’ll introduce BlinkParserFactory ParserFactory to create Parser instances and use SPI to find the factory to use according to the current dialect.

BlinkParserFactory ParserFactory definition:

Code Block
languagejava
public interface BlinkParserFactoryParserFactory extends ComponentFactory {

	Parser create(CatalogManager catalogManager, PlannerContext plannerContext);
}

...

Code Block
languagejava
public class ParserImplFactoryDefaultParserFactory implements BlinkParserFactoryParserFactory {

	@Override
	public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) {
		SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter;
		return new ParserImpl(
				catalogManager,
				() -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()),
				plannerContext::createCalciteParser,
				tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)));
	}

	@Override
	public Map<String, String> optionalContext() {
		DescriptorProperties properties = new DescriptorProperties();
		return properties.asMap();
	}

	@Override
	public Map<String, String> requiredContext() {
		DescriptorProperties properties = new DescriptorProperties();
		properties.putString(TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.DEFAULT.name().toLowerCase());
		return properties.asMap();
	}

	@Override
	public List<String> supportedProperties() {
		return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key());
	}
}

...

Code Block
languagejava
public class HiveParserFactory implements BlinkParserFactoryParserFactory {

	@Override
	public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) {
		SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter;
		return new HiveParser(
				catalogManager,
				() -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()),
				plannerContext::createCalciteParser,
				tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)),
				plannerContext);
	}

	@Override
	public Map<String, String> optionalContext() {
		DescriptorProperties properties = new DescriptorProperties();
		return properties.asMap();
	}

	@Override
	public Map<String, String> requiredContext() {
		DescriptorProperties properties = new DescriptorProperties();
		properties.putString(TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name().toLowerCase());
		return properties.asMap();
	}

	@Override
	public List<String> supportedProperties() {
		return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key());
	}
}

We need to support switching dialect on a per-statement basis. Therefore blink planner cannot hold a constant Parser instance, but has to create a new instance each time getParser is calledif dialect has changed. The updated getParser method:

Code Block
languagescala
  override def getParser: Parser = {
    if (getTableConfig.getSqlDialect != currentDialect) {
      val parserProps = Map(TableConfigOptions.TABLE_SQL_DIALECT.key() ->
        getTableConfig.getSqlDialect.name().toLowerCase)
      parser = ComponentFactoryService.find(classOf[BlinkParserFactoryParserFactory], parserProps)
        .create(catalogManager, plannerContext)
      currentDialect = getTableConfig.getSqlDialect
    }
    parser
  }

HiveParserCalcitePlanner

This class encapsulates all the logic to generate the RelNode plan for a query. We follow Hive’s CalcitePlanner to do it but adapt to our own needs. For example, Hive has its own RelNode implementations like HiveProject and HiveJoin, which we’ll change to LogicalProject and LogicalJoin respectively. Moreover, Hive’s CalcitePlanner doesn’t support all Hive features, e.g. SORT BY or LATERAL VIEW. We can extend the logic in HiveParserCalcitePlanner and support them.

...

  1. HiveQL syntax is in general backward compatible. So we can use a newer version to support older versions.
  2. The process to generate RelNode plan is tightly coupled with ASTNode and semantic analysis. While it’s theoretically possible to make HiveParserCalcitePlanner support different versions, that’ll make the logic much more complicated and error-prone.
  3. The copied code gives us more flexibility to support new features in the future. For example, we can adapt the code to support writing HiveQL for generic tables, or support querying tables across multiple catalogs.

Since most Hive users are still using Hive 2.x or 1.x, we'll copy Hive code from 2.x, which would reduce the required efforts to cover these versions.  For new features in Hive 3.x, e.g. table constraints, we'll extend the copied code to support them.

Go Beyond Hive

To support HiveQL on non-Hive tablesIn order to support the use cases mentioned in the "Motivation" section, we should ultimately make this feature as a pure SQL dialect and decoupled from Hive tables or functions. To achieve that, we need to:

  1. Extend Hive syntax, so that it supports identifiers like "catalog.db.table", and streaming features like Group Windows.
  2. Leverage Flink Catalog to retrieve metadata.
  3. Leverage Catalog to resolve tables or views.
  4. Leverage SqlOperatorTable to resolve functions.
  5. Extend Hive syntax to support streaming features such as Group Windows.

These efforts are within scope of this FLIP but don't need to be implemented in an MVP releaseUltimately we'll make this feature as a pure SQL dialect, which is orthogonal to the tables being queried.

New or Changed Public Interfaces

...