Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

JIRA: [Umbrella] Decouple Hive with Flink planner

https://lists.apache.org/thread/qfmf3v6zwd7ttcnn4nz48xz489w7cn4c
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-26603

Release1.18

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

To support Hive dialect with Flink, we have implemented Currently, with FLIP-123, FLIP-152, we have supported  Hive dialect.  But it also brings much maintenance burden and complexity for the Hive connector will depend on flink-table-planner and thus sometimes slows down the devolopement in flink-table-planner. Also, we expect to move out the Hive connector from Flink repository in release-6.0. So, it's necessary to decouple Hive connector with Flink planner but still keep supporting for Hive dialect with Hive connector. 

Proposed Changes

The idea

it's much dependent on Flink planner. Also the interfaces involved is more like to be internal, which is not convenient and available to implement other dialects.

So, this FLIP is to Introduce pluggable dialect with some public interfaces so that make it convenient to support other dialects.  Also, at the same time, it's intend to solve the legacy problem brought by supporting Hive dialect that the Hive connector is coupled to Flink planner, which brings much complexity and maintenance burden.

Public Interfaces

We propose the following interfaces:

  • Parser
  • ParserFactory
  • CatalogRegistry
  • OperationTreeBuilder

Parser

Code Block
languagejava
/** Provides methods for parsing SQL objects from a SQL string. */
@PublicEvolving
public interface Parser {
	// the interface has existed in current codebase, here we just expose it.
}


ParserFactory

Code Block
languagejava
/**
 * Factory that creates {@link Parser}.
 *
 * <p>The {@link #factoryIdentifier()} is identified by matching it against {@link
 * TableConfigOptions#TABLE_SQL_DIALECT}.
 */
@PublicEvolving
public interface ParserFactory extends Factory {

    /** Creates a new parser. */
    Parser create(Context context);

    /** Context provided when a parser is created. */
    @PublicEvolving
    interface Context {
        CatalogRegistry getCatalogRegistry(); // interfaces provided dealing with get catalog, qulify identifier, etc.

        OperationTreeBuilder getOperationTreeBuilder(); // interfaces provided to build Operation.
    }
}

CatalogRegistry

Code Block
languagejava
/**
 * A catalog registry for dealing with catalogs
 */
@PublicEvolving
public interface CatalogRegistry {
    String getCurrentDatabase();

    String getCurrentCatalog();

    ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier);

    Optional<Catalog> getCatalog(String catalogName);  

    Optional<ResolvedCatalogBaseTable<?>> getResolvedCatalogBaseTable(
            ObjectIdentifier objectIdentifier);
    
   	boolean isTemporaryTable(ObjectIdentifier objectIdentifier); 

    Optional<CatalogPartition> getPartition(
            ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec);
}

OperationTreeBuilder

Code Block
languagejava
/**
 * A builder for building {@link org.apache.flink.table.operations.Operation}
 */
@PublicEvolving
public interface OperationTreeBuilder {
     
    public QueryOperation project(List<Expression> projectList, QueryOperation child);

    public QueryOperation project(
            List<Expression> projectList, QueryOperation child, boolean explicitAlias);

    public QueryOperation project(
            List<Expression> projectList, QueryOperation child, List<OverWindow> overWindows)

    // omit other public interaces has implemented in OperationTreeBuilder
}

Implement detail

These public interfaces refered above will be added to module flink-table-api-java. And then when we want to support other dialects,  we need to include this module and implement DalectFactory to create a Parser and ExtendedOperationExecutor(optional) for the specific dialect.

Example for supporting other dialect

In here, I would like to give an example for support MySQL dialect.

MySQLParserFactory

Code Block
languagejava
public MySQLDialectFactory extends DialectFactory {
  
    @Override
    public String factoryIdentifier() {
        return "mysql";
    }
   
    Parser create(Context context) {
       return new MySqlParser(context);
    }
}


MySQLParser

Code Block
languagejava
public class MySQLParser implements Parser {
    private OperationTreeBuilder operationTreeBuilder;

    public MySQLParser(Context context) {
        this.operationTreeBuilder = context.getOperationTreeBuilder;
    }
    
    @Override
    List<Operation> parse(String statement) {
      // parse it to AST(Abstract Semantic Tree)
      MySQLAST mysqlAST = parseStament(statement);
      // convert the AST to Flink OperationTree
      List<Operation> operations = convertToOperation(mysqlAST);
   }
   
   private List<Operation> convertToOperation(MySQLAST mysqlAST) {
      // may look like
      Operation operation = operationTreeBuilder.project(Arrays.asList(Expressions.$("f0"),
        new SourceQueryOperation(xx));
      return Collections.singletonList(operation)
   }
   
   @Override
   UnresolvedIdentifier parseIdentifier(String identifier) {
    // may need to identifier `db.t` to array of [db, t]
    string[] names = parseMySQLIdentifier(identifier);
    return UnresolvedIdentifier.of(names);
  }

  @Override
  ResolvedExpression parseSqlExpression(
            String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) {
   // parse sqlExpression to ResolvedExpression
  }

  @Override
  String[] getCompletionHints(String statement, int position) {
   // just for example, return empty string array directly
   return new String[0];
  }
}

Then,  specific the class path of MySQLDialectFactory in the reosurce file org.apache.flink.table.factories.Factory to make it can be discovered by Java SPI mechanism. 

After that, you can switch to MySQL dialect by setting table.sql-dialect whiling excuting the sql.

Implementation for migrating Hive dialect

To support for Hive dialect, we should also follow this style: parsing to AST and convert it to Flink's operation tree via OperationTreeBuilder.  But we has implemeted Hive dialect, and the current implementation is As FLIP-152 described, for hive syntax, it’ll convert the sql to Calcite’s RelNode which is consistent to Hive’s implementation when using CBO in Hive, and then wrap the RelNode to PlannerQueryOperation. So what we really need in Hive connector is just the ability to create RelNode, which invoves accessing the RelOptCluster, RelBuilder, etc, provided by PlannerContext.. It'll take much efforts for we need to rewrite the codebase about Hive dialect totally for it's totally different from the current implementation.  It's hard to migrate to Flink's operation tree at one shot.

So the tempory way is to introduce So the main idea is to introuce a slim module called flink-table-calcite-planner-spi that provides Calcite dependency and exposes limited public interface like #getCluster, #createRelBuilder to enable to create RelNode. Then the Hive connector will only dependend on the slim module.

1. Move the interface ParserFactory from flink-table-planner to flink-table-planer-spi so that Hive parser can implement ParserFactory

...

bridge. This module contains the Calcite dependencies for writing planner plugins (e.g. SQL dialects) that interact with Calcite APIs. More exactly, currently, it is intend to provide the ability to create RelNode, which involves accessing the RelOptCluster, RelBuilder, etc, provided by PlannerContext.

But it's internal and designed for only Hive connector.  At the end,  the Hive dialect should be migrate to Flink's operation tree and the module can be dropped. To do this,  we propose to introduce the following interal interfaces/class in the new moudle flink-table-calcite-bridge:

CalciteContext

Code Block
languagejava
// Context for creating RelNode
@Internal
public interface RelNodeContext CalciteContext extends ParserFactory.Context {

    CalciteCatalogReader createCatalogReader(
            boolean lenientCaseSensitivity, String currentCatalog, String currentDatabase);

    RelOptCluster getCluster();

    FrameworkConfig createFrameworkConfig();

    RelDataTypeFactory getTypeFactory();

    RelBuilder createRelBuilder(String currentCatalog, String currentDatabase);
}

The interfaces have been implemented in PlannerContext, but we need to expose them to enable others Hive connector to use.

Then, hive-connector can use RelNodeContext to create RelNode without depending on flink-table-planner at all.

New or Changed Public Interfaces

 

CalciteQueryOperation

Code Block
languagejava
/** Wrapper for Calcite RelNode tree. */
@Internal
public class CalciteQueryOperation implements QueryOperation {
    private final RelNode calciteTree;
    private final ResolvedSchema resolvedSchema;
	
    public CalciteQueryOperation(RelNode calciteTree, ResolvedSchema resolvedSchema) {
       this.calciteTree = calciteTree;
       this.resolvedSchema = resolvedSchema;
    }
 }

Proposed Changes

  1. Introduce some public interfaces for supporting other dialects in flink-table-api-jave module
  2. Introduce a slim module called flink-table-calcite-bridge providing calcite dependency as a part of migrating Hive dialect.  It's just a tempory way to migrate Hive dialect, the specific dependency or logic should be dropped at the end

...

  1. .

Compatibility, Deprecation, and Migration Plan

N/AFor Hive dialect,  first replace the dependency on flink-table-planner with flink-table-calcite-bridge,  and then  migrate it to Flink's OperationTree along with dropping flink-table-calcite-bridge.

Test Plan

It's just a refactor work, which can be tested by existing tests.

...

Convert Hive AST to Operation tree. Actually, it's more Flink friendly for the Table API are doing in this way. But it'll take much efforts for we need to rewrite the codebase about Hive dialect totally and may involve creating some new operations. It's a huge work and hard to do it in one shot. As we want to move out hive connector in 1.16, it's more practical to decouple planner first and migrate to operation step by step. And more discussion about it can be seen in the origin design doc:
https://docs.google.com/document/d/1LMQ_mWfB_mkYkEBCUa2DgCO2YdtiZV7YRs2mpXyjdP4/edit?usp=sharing