Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

So, this FLIP is to Introduce pluggable dialect with some public interfaces to that make convenient to support other dialects.  Also, at the same time, it's intend to slove the legacy problem brought by supporting Hive dialect that the Hive connector is coupled to Flink planner, which brings much complexity and maintenance burden.

Proposed Changes

The idea

Introuce a slim module may called flink-table-planer-spi  containing the interface ParserFactory. Then, to support new dialect, it's need include the module to and implement the ParserFactory.  All the existing dialects are to follow this way.

Public Interfaces

We propose the following interfaces:

  • Parser
  • ParserFactory
  • CatalogRegistry
  • OperationContext

Parser

Code Block
languagejava
/** Provides methods for parsing SQL objects from a SQL string. */
@PublicEvolving
public interface Parser {
	// the interface has existed in current codebase, here we just expose it.
}

...

ParserFactory

Code Block
languagejava
/**
 * Factory that creates {@link Parser}.
 *
 * <p>The {@link #factoryIdentifier()} is identified by matching it against {@link
 * TableConfigOptions#TABLE_SQL_DIALECT}.
 */
@Public@PublicEvolving
public interface ParserFactory extends Factory {

    /** Creates a new parser. */
    Parser create(Context context);

    
    /** Context provided when a parser is created. */
    interface  @PublicEvolving
    interface Context {
        CatalogRegistry getCatalogRegistry(); // interfaces provided dealing with get catalog, qulify identifier, etc.

        OperationContext getOperationContext(); // interfaces provided to build Operation.
    }
}

CatalogRegistry

Code Block
languagejava
/**
 * A catalog registry for dealing with catalogs
 */
@PublicEvolving
public interface CatalogRegistry {
    String getCurrentDatabase();

    String getCurrentCatalog();

    ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier);

    Optional<Catalog> getCatalog(String catalogName);
}

OperationContext

Code Block
languagejava
/**
 * A context for building {@link org.apache.flink.table.operations.Operation}
 */
@PublicEvolving
public interface OperationContext {
     
    public QueryOperation scan(String path);

    public QueryOperation project(List<Expression> projectList, QueryOperation child);

    public QueryOperation project(
            List<Expression> projectList, QueryOperation child, boolean explicitAlias);

    public QueryOperation   }
}

Then, for example,  if you would like to support MySQL dialect, you need to provide an implementation for ParserFactory may called MySQLParserFactory. The factory is responsible for creating the corresponding MySQLParser. The interface Parser has existed in Flink, it look like as follows:

project(
            List<Expression> projectList, QueryOperation child, List<OverWindow> overWindows)

    // omit other public interaces has implemented in OperationTreeBuilder
}

Implement detail

We will introuce a slim module may called flink-table-planner-spi, which only expose these public interfaces refered above.  And then, when we want to support other dialect, we need to include this slim module and implement ParserFactory to create a Parser for the specific dialect.

Example for supporting other dialect

In here, I would like to give an example for support MySQL dialect.

MySQLParserFactory

Code Block
languagejava
public interface MySQLParserFactory extends ParserFactory {
  
    @Override
    public String factoryIdentifier() {
        return "mysql";
    }
   
    Parser create(Context context) {
       return new MySqlParser(Context);
    }
}


MySQLParser

Code Block
languagejava
public interface MySQLParser implements Parser {
  
  
Code Block
languagejava
/** Provides methods for parsing SQL objects from a SQL string. */
public interface Parser {    
    List<Operation> parse(String statement);
}

For the MySQLParser, you should provide the method that converts SQL statement to the  Operaton that Flink expects.

) {
    
    }
   
   UnresolvedIdentifier parseIdentifier(String identifier) {
       // 
   }

  ResolvedExpression parseSqlExpression(
            String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) {
  }
  
  
}

ThenFinally,  specific the class path of MySQLParserFactory in the reosurce file org.apache.flink.table.factories.Factory to make it can be discovered by Java SPI mechanism. 

After that, you can switch to MySQL dialect by setting table.sql-dialect whiling excuting the sql.

...

Implementation for migrating Hive

...

dialect

The support for Hive dialect should also do like this way: parsing to AST and convert it to Operation via OperationContext.  But we has implemeted Hive dialect, and the current implementation  is When talk about decopling Hive connector, it's a little of complex for the current implementation is converting the sql to Calcite’s RelNode which is consistent to Hive’s implementation when using CBO in Hive. It requires flink-table-planner to creat Calcite RelNode.The better way is to convert Hive AST to Operation tree, but it'll take much efforts for we need to rewrite the codebase about Hive dialect totally for it's totally different from the current implementation.  It's hard to migrate to Operation tree at one shot.

So the tempory way is to provide the calcite dependency in flink-table-planner-spi module,  along with the ability to create RelNode, which invoves accessing the RelOptCluster, RelBuilder, etc, provided by PlannerContext.  But it's internal and only used by Hive connector.Anyway, at   At the end, the calcite dependency should be removed and the Hive dialect should be migrate to Operation tree.  

About the ability to create RelNode, it may need a context to create RelNode, the context looks like as follows:

To do this,  we propose to introduce the following interal interfaces/class:

CalciteContext

Code Block
languagejava
// Context for creating RelNode
@Internal
public interface RelNodeContext CalciteContext extends ParserFactory.Context {

    CalciteCatalogReader createCatalogReader(
            boolean lenientCaseSensitivity, String currentCatalog, String currentDatabase);

    RelOptCluster getCluster();

    FrameworkConfig createFrameworkConfig();

    RelDataTypeFactory getTypeFactory();

    RelBuilder createRelBuilder(String currentCatalog, String currentDatabase);
}

The interfaces have been implemented in PlannerContext, but we need to expose them to enable Hive connector to use. 

Then, the Hive connector will work without the dependency on flink-table-planner in pom.

New or Changed Public Interfaces

With decoupling Hive connector, the public interfaces looks like:

CalciteQueryOperation

Code Block
languagejava
@Internal
public interface ParserFactory extends Factory {

    /** CreatesWrapper for aCalcite newRelNode parsertree. */
@Internal
public class CalciteQueryOperation implements Parser create(Context context);

    /** Context provided when a parser is created. */QueryOperatio {
    private final RelNode calciteTree;
    private final ResolvedSchema resolvedSchema;
	
    interface Contextpublic CalciteQueryOperation(RelNode calciteTree, ResolvedSchema resolvedSchema) {
       this.calciteTree CatalogManager= getCatalogManager()calciteTree;

       this.resolvedSchema RelNodeContext= getPlannerContext()resolvedSchema;
    }
 }

Proposed Changes

  1. Introduce a slim module flink-table-planner-spl exposing interfaces for supporting other dialects.
  2. Add calcite dependencys for Hive dialect in moudle flink-table-planner-spl. But's it's just a tempory way to migrating Hive dialect, the specific dependencys or logic should be dropped at the end.


Compatibility, Deprecation, and Migration Plan

N/AFor Hive dialect,  first replace the dependency on flink-table-planner with flink-table-planner-spl,  and then drop the calcite dependencys in flink-table-planner-spl and migrate it to Flink's OperationTree.

Test Plan

It's just refactor work, which can be tested by existing tests.

...