Status

Discussion threadhttps://lists.apache.org/thread/66g79w5zlod2ylyv8k065j57pjjmv1jo
Vote threadhttps://lists.apache.org/thread/qfmf3v6zwd7ttcnn4nz48xz489w7cn4c
JIRA

Unable to render Jira issues macro, execution error.

Release1.18

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, with FLIP-123, FLIP-152, we have supported  Hive dialect.  But it's much dependent on Flink planner. Also the interfaces involved is more like to be internal, which is not convenient and available to implement other dialects.

So, this FLIP is to Introduce pluggable dialect with some public interfaces so that make it convenient to support other dialects.  Also, at the same time, it's intend to solve the legacy problem brought by supporting Hive dialect that the Hive connector is coupled to Flink planner, which brings much complexity and maintenance burden.

Public Interfaces

We propose the following interfaces:

  • Parser
  • ParserFactory
  • CatalogRegistry
  • OperationTreeBuilder

Parser

/** Provides methods for parsing SQL objects from a SQL string. */
@PublicEvolving
public interface Parser {
	// the interface has existed in current codebase, here we just expose it.
}


ParserFactory

/**
 * Factory that creates {@link Parser}.
 *
 * <p>The {@link #factoryIdentifier()} is identified by matching it against {@link
 * TableConfigOptions#TABLE_SQL_DIALECT}.
 */
@PublicEvolving
public interface ParserFactory extends Factory {

    /** Creates a new parser. */
    Parser create(Context context);

    /** Context provided when a parser is created. */
    @PublicEvolving
    interface Context {
        CatalogRegistry getCatalogRegistry(); // interfaces provided dealing with get catalog, qulify identifier, etc.

        OperationTreeBuilder getOperationTreeBuilder(); // interfaces provided to build Operation.
    }
}

CatalogRegistry

/**
 * A catalog registry for dealing with catalogs
 */
@PublicEvolving
public interface CatalogRegistry {
    String getCurrentDatabase();

    String getCurrentCatalog();

    ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier);

    Optional<Catalog> getCatalog(String catalogName);  

    Optional<ResolvedCatalogBaseTable<?>> getResolvedCatalogBaseTable(
            ObjectIdentifier objectIdentifier);
    
   	boolean isTemporaryTable(ObjectIdentifier objectIdentifier); 

    Optional<CatalogPartition> getPartition(
            ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec);
}

OperationTreeBuilder

/**
 * A builder for building {@link org.apache.flink.table.operations.Operation}
 */
@PublicEvolving
public interface OperationTreeBuilder {
     
    public QueryOperation project(List<Expression> projectList, QueryOperation child);

    public QueryOperation project(
            List<Expression> projectList, QueryOperation child, boolean explicitAlias);

    public QueryOperation project(
            List<Expression> projectList, QueryOperation child, List<OverWindow> overWindows)

    // omit other public interaces has implemented in OperationTreeBuilder
}

Implement detail

These public interfaces refered above will be added to module flink-table-api-java. And then when we want to support other dialects,  we need to include this module and implement DalectFactory to create a Parser and ExtendedOperationExecutor(optional) for the specific dialect.

Example for supporting other dialect

In here, I would like to give an example for support MySQL dialect.

MySQLParserFactory

public MySQLDialectFactory extends DialectFactory {
  
    @Override
    public String factoryIdentifier() {
        return "mysql";
    }
   
    Parser create(Context context) {
       return new MySqlParser(context);
    }
}


MySQLParser

public class MySQLParser implements Parser {
    private OperationTreeBuilder operationTreeBuilder;

    public MySQLParser(Context context) {
        this.operationTreeBuilder = context.getOperationTreeBuilder;
    }
    
    @Override
    List<Operation> parse(String statement) {
      // parse it to AST(Abstract Semantic Tree)
      MySQLAST mysqlAST = parseStament(statement);
      // convert the AST to Flink OperationTree
      List<Operation> operations = convertToOperation(mysqlAST);
   }
   
   private List<Operation> convertToOperation(MySQLAST mysqlAST) {
      // may look like
      Operation operation = operationTreeBuilder.project(Arrays.asList(Expressions.$("f0"),
        new SourceQueryOperation(xx));
      return Collections.singletonList(operation)
   }
   
   @Override
   UnresolvedIdentifier parseIdentifier(String identifier) {
    // may need to identifier `db.t` to array of [db, t]
    string[] names = parseMySQLIdentifier(identifier);
    return UnresolvedIdentifier.of(names);
  }

  @Override
  ResolvedExpression parseSqlExpression(
            String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) {
   // parse sqlExpression to ResolvedExpression
  }

  @Override
  String[] getCompletionHints(String statement, int position) {
   // just for example, return empty string array directly
   return new String[0];
  }
}

Then,  specific the class path of MySQLDialectFactory in the reosurce file org.apache.flink.table.factories.Factory to make it can be discovered by Java SPI mechanism. 

After that, you can switch to MySQL dialect by setting table.sql-dialect whiling excuting the sql.

Implementation for migrating Hive dialect

To support for Hive dialect, we should also follow this style: parsing to AST and convert it to Flink's operation tree via OperationTreeBuilder.  But we has implemeted Hive dialect, and the current implementation is convert the sql to Calcite’s RelNode which is consistent to Hive’s implementation when using CBO in Hive. It'll take much efforts for we need to rewrite the codebase about Hive dialect totally for it's totally different from the current implementation.  It's hard to migrate to Flink's operation tree at one shot.

So the tempory way is to introduce a slim module called flink-table-calcite-bridge. This module contains the Calcite dependencies for writing planner plugins (e.g. SQL dialects) that interact with Calcite APIs. More exactly, currently, it is intend to provide the ability to create RelNode, which involves accessing the RelOptCluster, RelBuilder, etc, provided by PlannerContext.

But it's internal and designed for only Hive connector.  At the end,  the Hive dialect should be migrate to Flink's operation tree and the module can be dropped. To do this,  we propose to introduce the following interal interfaces/class in the new moudle flink-table-calcite-bridge:

CalciteContext

// Context for creating RelNode
@Internal
public interface CalciteContext extends ParserFactory.Context {

    CalciteCatalogReader createCatalogReader(
            boolean lenientCaseSensitivity, String currentCatalog, String currentDatabase);

    RelOptCluster getCluster();

    FrameworkConfig createFrameworkConfig();

    RelDataTypeFactory getTypeFactory();

    RelBuilder createRelBuilder(String currentCatalog, String currentDatabase);
}

The interfaces have been implemented in PlannerContext, but we need to expose them to enable Hive connector to use. 

CalciteQueryOperation

/** Wrapper for Calcite RelNode tree. */
@Internal
public class CalciteQueryOperation implements QueryOperation {
    private final RelNode calciteTree;
    private final ResolvedSchema resolvedSchema;
	
    public CalciteQueryOperation(RelNode calciteTree, ResolvedSchema resolvedSchema) {
       this.calciteTree = calciteTree;
       this.resolvedSchema = resolvedSchema;
    }
 }

Proposed Changes

  1. Introduce some public interfaces for supporting other dialects in flink-table-api-jave module
  2. Introduce a slim module called flink-table-calcite-bridge providing calcite dependency as a part of migrating Hive dialect.  It's just a tempory way to migrate Hive dialect, the specific dependency or logic should be dropped at the end.

Compatibility, Deprecation, and Migration Plan

For Hive dialect,  first replace the dependency on flink-table-planner with flink-table-calcite-bridge,  and then  migrate it to Flink's OperationTree along with dropping flink-table-calcite-bridge.

Test Plan

It's just a refactor work, which can be tested by existing tests.