Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

JIRA: [Umbrella] Pluggable dialect and decouple Hive connector

...

https://lists.apache.org/thread/qfmf3v6zwd7ttcnn4nz48xz489w7cn4c
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-26603

Release1.18


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • Parser
  • ParserFactory
  • CatalogRegistry
  • OperationContextOperationTreeBuilder

Parser

Code Block
languagejava
/** Provides methods for parsing SQL objects from a SQL string. */
@PublicEvolving
public interface Parser {
	// the interface has existed in current codebase, here we just expose it.
}

...

Code Block
languagejava
/**
 * A catalog registry for dealing with catalogs
 */
@PublicEvolving
public interface CatalogRegistry {
    String getCurrentDatabase();

    String getCurrentCatalog();

    ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier);

    Optional<Catalog> getCatalog(String catalogName);  

    Optional<ResolvedCatalogBaseTable<?>> getResolvedCatalogBaseTable(
            ObjectIdentifier objectIdentifier);
    
   	boolean isTemporaryTable(ObjectIdentifier objectIdentifier); 

    Optional<CatalogPartition> getPartition(
            ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec);
}

OperationTreeBuilder

Code Block
languagejava
/**
 * A builder for building {@link org.apache.flink.table.operations.Operation}
 */
@PublicEvolving
public interface OperationTreeBuilder {
     
    public QueryOperation scan(String path);

    public QueryOperation project(List<Expression> projectList, QueryOperation child);

    public QueryOperation project(
            List<Expression> projectList, QueryOperation child, boolean explicitAlias);

    public QueryOperation project(
            List<Expression> projectList, QueryOperation child, List<OverWindow> overWindows)

    // omit other public interaces has implemented in OperationTreeBuilder
}

...

These public interfaces refered above will be added to module flink-table-api-java. And then when we want to support other dialects,  we need to include this module and implement ParserFactory DalectFactory to create a Parser and ExtendedOperationExecutor(optional) for the specific dialect.

...

Code Block
languagejava
public interface MySQLParserFactoryMySQLDialectFactory extends ParserFactoryDialectFactory {
  
    @Override
    public String factoryIdentifier() {
        return "mysql";
    }
   
    Parser create(Context context) {
       return new MySqlParser(context);
    }
}

...

Code Block
languagejava
public class MySQLParser implements Parser {
    private OperationTreeBuilder operationTreeBuilder;

    public MySQLParser(Context context) {
        this.operationTreeBuilder = context.getOperationTreeBuilder;
    }
    
    @Override
    List<Operation> parse(String statement) {
      // parse it to AST(Abstract Semantic Tree)
      MySQLAST mysqlAST = parseStament(statement);
      // convert the AST to Flink OperationTree
      List<Operation> operations = convertToOperation(mysqlAST);
   }
   
   private List<Operation> convertToOperation(MySQLAST mysqlAST) {
      // may look like
      Operation operation = operationTreeBuilder.project(Arrays.asList(Expressions.$("f0"), operationContext.scan("t1"
        new SourceQueryOperation(xx));
      return Collections.singletonList(operation)
   }
   
   @Override
   UnresolvedIdentifier parseIdentifier(String identifier) {
    // may need to identifier `db.t` to array of [db, t]
    string[] names = parseMySQLIdentifier(identifier);
    return UnresolvedIdentifier.of(names);
  }

  @Override
  ResolvedExpression parseSqlExpression(
            String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) {
   // parse sqlExpression to ResolvedExpression
  }

  @Override
  String[] getCompletionHints(String statement, int position) {
   // just for example, return empty string array directly
   return new String[0];
  }
}

Then,  specific the class path of MySQLParserFactory MySQLDialectFactory in the reosurce file org.apache.flink.table.factories.Factory to make it can be discovered by Java SPI mechanism. 

...

  1. Introduce some public interfaces for supporting other dialects in flink-table-api-jave module
  2. Introduce a slim module called flink-table-calcite-bridge providing calcite dependencys dependency as a part of migrating Hive dialect.  It's just a tempory way to migrate Hive dialect, the specific dependencys dependency or logic should be dropped at the end.

...