Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

JIRA: [Umbrella] Pluggable dialect and decouple Hive connector

...

https://lists.apache.org/thread/qfmf3v6zwd7ttcnn4nz48xz489w7cn4c
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-26603

Release1.18


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, with FLIP-123, FLIP-152, we have supported  Hive dialect.  But it's much dependent on Flink planner. Also the interfaces involved is more like to be internal, which is not convenient and available to implement other dialects.

So, this FLIP is to Introduce pluggable dialect with some public interfaces to so that make it convenient to support other dialects.  Also, at the same time, it's intend to slove solve the legacy problem brought by supporting Hive dialect that the Hive connector is coupled to Flink planner, which brings much complexity and maintenance burden.

Public Interfaces

We propose the following interfaces:

  • Parser
  • ParserFactory
  • CatalogRegistry
  • OperationContextOperationTreeBuilder

Parser

Code Block
languagejava
/** Provides methods for parsing SQL objects from a SQL string. */
@PublicEvolving
public interface Parser {
	// the interface has existed in current codebase, here we just expose it.
}


ParserFactory

Code Block
languagejava
/**
 * Factory that creates {@link Parser}.
 *
 * <p>The {@link #factoryIdentifier()} is identified by matching it against {@link
 * TableConfigOptions#TABLE_SQL_DIALECT}.
 */
@PublicEvolving
public interface ParserFactory extends Factory {

    /** Creates a new parser. */
    Parser create(Context context);

    /** Context provided when a parser is created. */
    @PublicEvolving
    interface Context {
        CatalogRegistry getCatalogRegistry(); // interfaces provided dealing with get catalog, qulify identifier, etc.

        OperationContextOperationTreeBuilder getOperationContextgetOperationTreeBuilder(); // interfaces provided to build Operation.
    }
}

CatalogRegistry

Code Block
languagejava
/**
 * A catalog registry for dealing with catalogs
 */
@PublicEvolving
public interface CatalogRegistry {
    String getCurrentDatabase();

    String getCurrentCatalog();

    ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier);

    Optional<Catalog> getCatalog(String catalogName);  

}

...

    Optional<ResolvedCatalogBaseTable<?>> getResolvedCatalogBaseTable(
            ObjectIdentifier objectIdentifier);
    
   	boolean isTemporaryTable(ObjectIdentifier objectIdentifier); 

    Optional<CatalogPartition> getPartition(
            ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec);
}

OperationTreeBuilder

Code Block
languagejava
/**
 * A contextbuilder for building {@link org.apache.flink.table.operations.Operation}
 */
@PublicEvolving
public interface OperationContextOperationTreeBuilder {
     
    public QueryOperation scan(String path);

    public QueryOperation project(List<Expression> projectList, QueryOperation child);

    public QueryOperation project(
            List<Expression> projectList, QueryOperation child, boolean explicitAlias);

    public QueryOperation project(
            List<Expression> projectList, QueryOperation child, List<OverWindow> overWindows)

    // omit other public interaces has implemented in OperationTreeBuilder
}

Implement detail

We will introuce a slim module may called These public interfaces refered above will be added to module flink-table-planner-spi, which only expose these public interfaces refered above.  api-java. And then , when we want to support other dialectdialects,  we need to include this slim module and implement ParserFactory DalectFactory to create a Parser and ExtendedOperationExecutor(optional) for the specific dialect.

Example for supporting other dialect

In here, I would like to give an example for support MySQL dialect.

MySQLParserFactory

Code Block
languagejava
public interfaceMySQLDialectFactory MySQLParserFactory extends ParserFactoryDialectFactory {
  
    @Override
    public String factoryIdentifier() {
        return "mysql";
    }
   
    Parser create(Context context) {
       return new MySqlParser(Contextcontext);
    }
}


MySQLParser

Code Block
languagejava
public class MySQLParser implements Parser {
    private OperationContextOperationTreeBuilder operationContextoperationTreeBuilder;

    public MySQLParser(OperationContextContext operationcontext) {
        this.operationoperationTreeBuilder = operationcontext.getOperationTreeBuilder;
    }
    
    @Override
    List<Operation> parse(String statement) {
      // parse it to AST(Abstract Semantic Tree)
      MySQLAST mysqlAST = parseStament(statement);
      // convert the AST to Flink OperationTree
      List<Operation> operations = convertToOperation(mysqlAST);
   }
   
   private List<Operation> convertToOperation(MySQLAST mysqlAST) {
      // may look likes like
      Operation operation = operationContextoperationTreeBuilder.project(Arrays.asList(Expressions.$("f0"), operationContext.scan("t1"
        new SourceQueryOperation(xx));
      return Collections.singletonList(operation)
   }
   
   @Override
   UnresolvedIdentifier parseIdentifier(String identifier) {
    // may need to identifier `db.t` to array of [db, t]
    string[] names = parseMySQLIdentifier(identifier);
    return UnresolvedIdentifier.of(names);
  }

  @Override
  ResolvedExpression parseSqlExpression(
            String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) {
   // parse sqlExpression to ResolvedExpression
  }

  @Override
  String[] getCompletionHints(String statement, int position) {
   // just for example, return empty string array directly
   return new String[0];
  }
}

Then,  specific the class path of MySQLParserFactory MySQLDialectFactory in the reosurce file org.apache.flink.table.factories.Factory to make it can be discovered by Java SPI mechanism. 

After that, you can switch to MySQL dialect by setting table.sql-dialect whiling excuting the sql.

Implementation for migrating Hive dialect

The To support for Hive dialect, we should also do like follow this waystyle: parsing to AST and convert it to Operation via OperationContextFlink's operation tree via OperationTreeBuilder.  But we has implemeted Hive dialect, and the current implementation  is converting is convert the sql to Calcite’s RelNode which is consistent to Hive’s implementation when using CBO in Hive. It'll take much efforts for we need to rewrite the codebase about Hive dialect totally for it's totally different from the current implementation.  It's hard to migrate to Operation Flink's operation tree at one shot.

So the tempory way is to provide the calcite dependency in introduce a slim module called flink-table-plannercalcite-spi module,  along with bridge. This module contains the Calcite dependencies for writing planner plugins (e.g. SQL dialects) that interact with Calcite APIs. More exactly, currently, it is intend to provide the ability to create RelNode, which invoves involves accessing the RelOptCluster, RelBuilder, etc, provided by PlannerContext.

  But it's internal and designed for only used by Hive connector.  At the end,  the calcite dependency should be removed and the Hive dialect should be migrate to Operation treeFlink's operation tree and the module can be dropped. To do this,  we propose to introduce the following interal interfaces/class in the new moudle flink-table-calcite-bridge:

CalciteContext

Code Block
languagejava
// Context for creating RelNode
@Internal
public interface CalciteContext extends ParserFactory.Context {

    CalciteCatalogReader createCatalogReader(
            boolean lenientCaseSensitivity, String currentCatalog, String currentDatabase);

    RelOptCluster getCluster();

    FrameworkConfig createFrameworkConfig();

    RelDataTypeFactory getTypeFactory();

    RelBuilder createRelBuilder(String currentCatalog, String currentDatabase);
}

The interfaces have been implemented in PlannerContext, but we need to expose them to enable Hive connector to use. 

CalciteQueryOperation

Code Block
languagejava
/** Wrapper for Calcite RelNode tree. */
@Internal
public class CalciteQueryOperation implements QueryOperatioQueryOperation {
    private final RelNode calciteTree;
    private final ResolvedSchema resolvedSchema;
	
    public CalciteQueryOperation(RelNode calciteTree, ResolvedSchema resolvedSchema) {
       this.calciteTree = calciteTree;
       this.resolvedSchema = resolvedSchema;
    }
 }

Proposed Changes

  1. Introduce some public interfaces for supporting other dialects in flink-table-api-jave module
  2. Introduce a slim module called flink-table-planner-spl exposing interfaces for supporting other dialects.Add calcite dependencys for Hive dialect in moudle flink-table-planner-spl. But's it's calcite-bridge providing calcite dependency as a part of migrating Hive dialect.  It's just a tempory way to migrating migrate Hive dialect, the specific dependencys dependency or logic should be dropped at the end.

Compatibility, Deprecation, and Migration Plan

For Hive dialect,  first replace the dependency on flink-table-planner with flink-table-plannercalcite-splbridge,  and then drop the calcite dependencys in flink-table-planner-spl and then  migrate it to Flink's OperationTree along with dropping flink-table-calcite-bridge.

Test Plan

It's just a refactor work, which can be tested by existing tests.

...