Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-21529

Release1.13

Status

Current stateunder discussion

...


Motivation

FLIP-123 has implemented HiveQL-compatible DDLs so that users can process metadata in HiveQL. This FLIP aims to provide syntax compatibility for queries. Similar as FLIP-123, this FLIP will improve interoperability with Hive and reduce migration efforts. Besides, this FLIP also makes it possible to extend HiveQL to support streaming features. And therefore, users may not only write HiveQL for batch jobs, they may also run streaming jobs in a HiveQL-fashion, which provides better batch-streaming unified experience for a migrating userwith this FLIP, the following typical use cases can be supported:

  1. Users can migrate their batch Hive jobs to Flink, without needing to modify the SQL scripts.
  2. Users can write HiveQL to integrate streaming features with Hive tables, e.g. streaming data from Kafka to Hive.
  3. Users can write HiveQL to process non-Hive tables, either in batch or in streaming jobs.

For migrating users, we believe it's desirable for them to be able to continue write Hive syntax. It not only makes the migration easier, but also helps them leverage Flink for new scenarios more quickly, and thus provides unified batch-streaming experience.

Proposed Changes

The Idea

...

  1. Just delegate DDLs to super class and reuse FLIP-123 to process them.
  2. Process DDLs using the semantics we get from Hive.

For simplicityIn order to provide consistent user experience, we choose option #1#2 to handle DDLs.

DQL

Use HiveParserCalcitePlanner to generate the RelNode and create a PlannerQueryOperation with it.

...

Since we have multiple implementations of org.apache.flink.table.delegation.Parser now, we need to make it pluggable so that the planner knows which one to use. We’ll introduce BlinkParserFactory ParserFactory to create Parser instances and use SPI to find the factory to use according to the current dialect.

BlinkParserFactory ParserFactory definition:

Code Block
languagejava
public interface BlinkParserFactoryParserFactory extends ComponentFactory {

	Parser create(CatalogManager catalogManager, PlannerContext plannerContext);
}

...

Code Block
languagejava
public class ParserImplFactoryDefaultParserFactory implements BlinkParserFactoryParserFactory {

	@Override
	public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) {
		SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter;
		return new ParserImpl(
				catalogManager,
				() -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()),
				plannerContext::createCalciteParser,
				tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)));
	}

	@Override
	public Map<String, String> optionalContext() {
		DescriptorProperties properties = new DescriptorProperties();
		return properties.asMap();
	}

	@Override
	public Map<String, String> requiredContext() {
		DescriptorProperties properties = new DescriptorProperties();
		properties.putString(TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.DEFAULT.name().toLowerCase());
		return properties.asMap();
	}

	@Override
	public List<String> supportedProperties() {
		return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key());
	}
}

...

Code Block
languagejava
public class HiveParserFactory implements BlinkParserFactoryParserFactory {

	@Override
	public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) {
		SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter;
		return new HiveParser(
				catalogManager,
				() -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()),
				plannerContext::createCalciteParser,
				tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)),
				plannerContext);
	}

	@Override
	public Map<String, String> optionalContext() {
		DescriptorProperties properties = new DescriptorProperties();
		return properties.asMap();
	}

	@Override
	public Map<String, String> requiredContext() {
		DescriptorProperties properties = new DescriptorProperties();
		properties.putString(TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name().toLowerCase());
		return properties.asMap();
	}

	@Override
	public List<String> supportedProperties() {
		return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key());
	}
}

We need to support switching dialect on a per-statement basis. Therefore blink planner cannot hold a constant Parser instance, but has to create a new instance each time getParser is calledif dialect has changed. The updated getParser method:

Code Block
languagescala
  override def getParser: Parser = {
    if (getTableConfig.getSqlDialect != currentDialect) {
      val parserProps = Map(TableConfigOptions.TABLE_SQL_DIALECT.key() ->
        getTableConfig.getSqlDialect.name().toLowerCase)
      parser = ComponentFactoryService.find(classOf[BlinkParserFactoryParserFactory], parserProps)
        .create(catalogManager, plannerContext)
      currentDialect = getTableConfig.getSqlDialect
    }
    parser
  }

HiveParserCalcitePlanner

...

  1. HiveQL syntax is in general backward compatible. So we can use a newer version to support older versions.
  2. The process to generate RelNode plan is tightly coupled with ASTNode and semantic analysis. While it’s theoretically possible to make HiveParserCalcitePlanner support different versions, that’ll make the logic much more complicated and error-prone.
  3. The copied code gives us more flexibility to support new features in the future. For example, we can adapt the code to support writing HiveQL for generic tables, or support querying tables across multiple catalogs.

Since most Hive users are still using Hive 2.x or 1.x, we'll copy Hive code from 2.x, which would reduce the required efforts to cover these versions.  For new features in Hive 3.x, e.g. table constraints, we'll extend the copied code to support them.

Go Beyond Hive

In order to support the use cases mentioned in the "Motivation" section, we should ultimately make this feature as a pure SQL dialect and decoupled from Hive tables or functions. To achieve that, we need to:

  1. Extend Hive syntax, so that it supports identifiers like "catalog.db.table".
  2. Leverage Catalog to resolve tables or views.
  3. Leverage SqlOperatorTable to resolve functions.
  4. Extend Hive syntax to support streaming features such as Group Windows.

These efforts are within scope of this FLIP but don't need to be implemented in an MVP release.

New or Changed Public Interfaces

...

The following limitations apply when using this feature.

...

.

  1. HiveModule should be used in order to use Hive built-in functions.
  2. Some features are not supported due to underlying functionalities are missing. For example, Hive’s UNION type is not supported.

Appendix

...