Status
Current state: accepted
Discussion thread:
JIRA:
Motivation
FLIP-123 has implemented HiveQL-compatible DDLs so that users can process metadata in HiveQL. This FLIP aims to provide syntax compatibility for queries. Similar as FLIP-123, this FLIP will improve interoperability with Hive and reduce migration efforts. Besides, this FLIP also makes it possible to extend HiveQL to support streaming features. And with this FLIP, the following typical use cases can be supported:
- Users can migrate their batch Hive jobs to Flink, without needing to modify the SQL scripts.
- Users can write HiveQL to integrate streaming features with Hive tables, e.g. streaming data from Kafka to Hive.
- Users can write HiveQL to process non-Hive tables, either in batch or in streaming jobs.
For migrating users, we believe it's desirable for them to be able to continue write Hive syntax. It not only makes the migration easier, but also helps them leverage Flink for new scenarios more quickly, and thus provides unified batch-streaming experience.
Proposed Changes
The Idea
One thing that Flink and Hive have in common is that they both generate a Calcite plan (in form of a RelNode
) during SQL query processing. While Flink uses this RelNode
as its logical plan, Hive only uses it for optimization and later converts it to its own form of logical plan. The inputs to generate the plan are also different. For Flink, the input is a SqlNode
parsed by Calcite, while for Hive, the input is an ASTNode
parsed by Antlr.
The overall idea to support Hive query syntax, is to reuse Hive’s Antlr parser to generate the ASTNode
, and then adapt Hive’s RelNode-generation process to generate a RelNode
that can be used in Flink. Once we have the RelNode, we can simply follow the current processing path in Flink to optimize and finally execute the plan.
Introduce HiveParser
In order to process a SQL statement differently, we introduce HiveParser
as a sub-class of ParserImpl
and overrides the parse
method. This is the entry point of this feature.
We will leverage Hive’s Antlr parser to generate the ASTNode
, and Hive’s semantic analyzer to collect semantic info of the SQL statement. Based on these information, we can tell which kind the statement is and process it accordingly.
DDL
We have two options to process DDLs.
- Just delegate DDLs to super class and reuse FLIP-123 to process them.
- Process DDLs using the semantics we get from Hive.
In order to provide consistent user experience, we choose option #2 to handle DDLs.
DQL
Use HiveParserCalcitePlanner
to generate the RelNode
and create a PlannerQueryOperation
with it.
DML
Since we don’t support Hive ACID tables, we’ll only consider INSERT here.
Use HiveParserCalcitePlanner
to generate the RelNode
for the query and create a CatalogSinkModifyOperation
. To do that, HiveParser
needs to:
- Figure out static partition specs for partitioned table, and add static partition values to Project in the
RelNode
- Figure out dest schema, if any, and adjust the Project fields in the
RelNode
- Figure out whether to overwrite existing data
Hive Dialect & Pluggable Parser
FLIP-123 has made SQL dialect as the switch to turn on/off Hive compatibility. In this FLIP, we’ll continue to use it for queries.
Since we have multiple implementations of org.apache.flink.table.delegation.Parser
now, we need to make it pluggable so that the planner knows which one to use. We’ll introduce ParserFactory
to create Parser
instances and use SPI to find the factory to use according to the current dialect.
ParserFactory
definition:
public interface ParserFactory extends ComponentFactory { Parser create(CatalogManager catalogManager, PlannerContext plannerContext); }
The factory to create ParserImpl
:
public class DefaultParserFactory implements ParserFactory { @Override public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) { SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter; return new ParserImpl( catalogManager, () -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()), plannerContext::createCalciteParser, tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema))); } @Override public Map<String, String> optionalContext() { DescriptorProperties properties = new DescriptorProperties(); return properties.asMap(); } @Override public Map<String, String> requiredContext() { DescriptorProperties properties = new DescriptorProperties(); properties.putString(TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.DEFAULT.name().toLowerCase()); return properties.asMap(); } @Override public List<String> supportedProperties() { return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key()); } }
The factory to create HiveParser
:
public class HiveParserFactory implements ParserFactory { @Override public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) { SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter; return new HiveParser( catalogManager, () -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()), plannerContext::createCalciteParser, tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)), plannerContext); } @Override public Map<String, String> optionalContext() { DescriptorProperties properties = new DescriptorProperties(); return properties.asMap(); } @Override public Map<String, String> requiredContext() { DescriptorProperties properties = new DescriptorProperties(); properties.putString(TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name().toLowerCase()); return properties.asMap(); } @Override public List<String> supportedProperties() { return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key()); } }
We need to support switching dialect on a per-statement basis. Therefore blink planner cannot hold a constant Parser
instance, but has to create a new instance if dialect has changed. The updated getParser
method:
override def getParser: Parser = { if (getTableConfig.getSqlDialect != currentDialect) { val parserProps = Map(TableConfigOptions.TABLE_SQL_DIALECT.key() -> getTableConfig.getSqlDialect.name().toLowerCase) parser = ComponentFactoryService.find(classOf[ParserFactory], parserProps) .create(catalogManager, plannerContext) currentDialect = getTableConfig.getSqlDialect } parser }
HiveParserCalcitePlanner
This class encapsulates all the logic to generate the RelNode
plan for a query. We follow Hive’s CalcitePlanner
to do it but adapt to our own needs. For example, Hive has its own RelNode
implementations like HiveProject
and HiveJoin
, which we’ll change to LogicalProject
and LogicalJoin
respectively. Moreover, Hive’s CalcitePlanner
doesn’t support all Hive features, e.g. SORT BY or LATERAL VIEW. We can extend the logic in HiveParserCalcitePlanner
and support them.
Support Multiple Hive Versions
In order to support multiple Hive versions, we keep our own copy of Hive code to generate the ASTNode
and do semantic analysis. This means that a SQL statement is processed with the same code no matter which Hive version is in use. The rationale behind this decision is:
- HiveQL syntax is in general backward compatible. So we can use a newer version to support older versions.
- The process to generate
RelNode
plan is tightly coupled withASTNode
and semantic analysis. While it’s theoretically possible to makeHiveParserCalcitePlanner
support different versions, that’ll make the logic much more complicated and error-prone. - The copied code gives us more flexibility to support new features in the future. For example, we can adapt the code to support writing HiveQL for generic tables, or support querying tables across multiple catalogs.
Since most Hive users are still using Hive 2.x or 1.x, we'll copy Hive code from 2.x, which would reduce the required efforts to cover these versions. For new features in Hive 3.x, e.g. table constraints, we'll extend the copied code to support them.
Go Beyond Hive
In order to support the use cases mentioned in the "Motivation
" section, we should ultimately make this feature as a pure SQL dialect and decoupled from Hive tables or functions. To achieve that, we need to:
- Extend Hive syntax, so that it supports identifiers like "catalog.db.table".
- Leverage
Catalog
to resolve tables or views. - Leverage
SqlOperatorTable
to resolve functions. - Extend Hive syntax to support streaming features such as Group Windows.
These efforts are within scope of this FLIP but don't need to be implemented in an MVP release.
New or Changed Public Interfaces
No public interface has to be added or changed.
Compatibility, Deprecation, and Migration Plan
N/A
Test Plan
The best way to verify Hive query compatibility is to run Hive’s QFile test with Flink. But in order to limit our code base size and test time, this should not be included in this FLIP, at least not as a regular UT/IT case.
Rejected Alternatives
Let Hive generate the RelNode
and then convert it to meet Flink’s requirements. This will inevitably lead to jar conflicts because Hive depends on a quite old Calcite version while Flink basically upgrades Calcite dependency in each major release. In addition, it’s more difficult to support new features.
Limitations
The following limitations apply when using this feature.
HiveModule
should be used in order to use Hive built-in functions.- Some features are not supported due to underlying functionalities are missing. For example, Hive’s
UNION
type is not supported.
Appendix
The following diagram shows the process how HiveParser
parses a SQL statement.