...
- Just delegate DDLs to super class and reuse FLIP-123 to process them.
- Process DDLs using the semantics we get from Hive.
For simplicityIn order to provide consistent user experience, we choose option #1#2 to handle DDLs.
DQL
Use HiveParserCalcitePlanner
to generate the RelNode
and create a PlannerQueryOperation
with it.
...
Since we have multiple implementations of org.apache.flink.table.delegation.Parser
now, we need to make it pluggable so that the planner knows which one to use. We’ll introduce BlinkParserFactory
ParserFactory
to create Parser
instances and use SPI to find the factory to use according to the current dialect.
BlinkParserFactory
ParserFactory
definition:
Code Block | ||
---|---|---|
| ||
public interface BlinkParserFactoryParserFactory extends ComponentFactory { Parser create(CatalogManager catalogManager, PlannerContext plannerContext); } |
...
Code Block | ||
---|---|---|
| ||
public class ParserImplFactoryDefaultParserFactory implements BlinkParserFactoryParserFactory { @Override public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) { SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter; return new ParserImpl( catalogManager, () -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()), plannerContext::createCalciteParser, tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema))); } @Override public Map<String, String> optionalContext() { DescriptorProperties properties = new DescriptorProperties(); return properties.asMap(); } @Override public Map<String, String> requiredContext() { DescriptorProperties properties = new DescriptorProperties(); properties.putString(TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.DEFAULT.name().toLowerCase()); return properties.asMap(); } @Override public List<String> supportedProperties() { return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key()); } } |
...
Code Block | ||
---|---|---|
| ||
public class HiveParserFactory implements BlinkParserFactoryParserFactory { @Override public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) { SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter; return new HiveParser( catalogManager, () -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()), plannerContext::createCalciteParser, tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)), plannerContext); } @Override public Map<String, String> optionalContext() { DescriptorProperties properties = new DescriptorProperties(); return properties.asMap(); } @Override public Map<String, String> requiredContext() { DescriptorProperties properties = new DescriptorProperties(); properties.putString(TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name().toLowerCase()); return properties.asMap(); } @Override public List<String> supportedProperties() { return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key()); } } |
We need to support switching dialect on a per-statement basis. Therefore blink planner cannot hold a constant Parser
instance, but has to create a new instance each time getParser
is calledif dialect has changed. The updated getParser
method:
Code Block | ||
---|---|---|
| ||
override def getParser: Parser = { if (getTableConfig.getSqlDialect != currentDialect) { val parserProps = Map(TableConfigOptions.TABLE_SQL_DIALECT.key() -> getTableConfig.getSqlDialect.name().toLowerCase) parser = ComponentFactoryService.find(classOf[BlinkParserFactoryParserFactory], parserProps) .create(catalogManager, plannerContext) currentDialect = getTableConfig.getSqlDialect } parser } |
HiveParserCalcitePlanner
This class encapsulates all the logic to generate the RelNode
plan for a query. We follow Hive’s CalcitePlanner
to do it but adapt to our own needs. For example, Hive has its own RelNode
implementations like HiveProject
and HiveJoin
, which we’ll change to LogicalProject
and LogicalJoin
respectively. Moreover, Hive’s CalcitePlanner
doesn’t support all Hive features, e.g. SORT BY or LATERAL VIEW. We can extend the logic in HiveParserCalcitePlanner
and support them.
...
- HiveQL syntax is in general backward compatible. So we can use a newer version to support older versions.
- The process to generate
RelNode
plan is tightly coupled withASTNode
and semantic analysis. While it’s theoretically possible to makeHiveParserCalcitePlanner
support different versions, that’ll make the logic much more complicated and error-prone. - The copied code gives us more flexibility to support new features in the future. For example, we can adapt the code to support writing HiveQL for generic tables, or support querying tables across multiple catalogs.
Since most Hive users are still using Hive 2.x or 1.x, we'll copy Hive code from 2.x, which would reduce the required efforts to cover these versions. For new features in Hive 3.x, e.g. table constraints, we'll extend the copied code to support them.
Go Beyond Hive
To support HiveQL on non-Hive tablesIn order to support the use cases mentioned in the "Motivation
" section, we should ultimately make this feature as a pure SQL dialect and decoupled from Hive tables or functions. To achieve that, we need to:
- Extend Hive syntax, so that it supports identifiers like "catalog.db.table", and streaming features like Group Windows.
- Leverage Flink Catalog to retrieve metadata.
- Leverage
Catalog
to resolve tables or views. - Leverage
SqlOperatorTable
to resolve functions. - Extend Hive syntax to support streaming features such as Group Windows.
These efforts are within scope of this FLIP but don't need to be implemented in an MVP releaseUltimately we'll make this feature as a pure SQL dialect, which is orthogonal to the tables being queried.
New or Changed Public Interfaces
...