Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

JIRA: [Umbrella] Pluggable dialect and decouple Hive connector

...

ASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-26603

Release1.18


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • Parser
  • ParserFactory
  • CatalogRegistry
  • OperationContextOperationTreeBuilder

Parser

Code Block
languagejava
/** Provides methods for parsing SQL objects from a SQL string. */
@PublicEvolving
public interface Parser {
	// the interface has existed in current codebase, here we just expose it.
}

...

Code Block
languagejava
/**
 * A catalog registry for dealing with catalogs
 */
@PublicEvolving
public interface CatalogRegistry {
    String getCurrentDatabase();

    String getCurrentCatalog();

    ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier);

    Optional<Catalog> getCatalog(String catalogName);  

    Optional<ResolvedCatalogBaseTable<?>> getResolvedCatalogBaseTable(
            ObjectIdentifier objectIdentifier);
    
   	boolean isTemporaryTable(ObjectIdentifier objectIdentifier); 

    Optional<CatalogPartition> getPartition(
            ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec);
}

OperationTreeBuilder

Code Block
languagejava
/**
 * A builder for building {@link org.apache.flink.table.operations.Operation}
 */
@PublicEvolving
public interface OperationTreeBuilder {
     
    public QueryOperation scan(String path);

    public QueryOperation project(List<Expression> projectList, QueryOperation child);

    public QueryOperation project(
            List<Expression> projectList, QueryOperation child, boolean explicitAlias);

    public QueryOperation project(
            List<Expression> projectList, QueryOperation child, List<OverWindow> overWindows)

    // omit other public interaces has implemented in OperationTreeBuilder
}

...

These public interfaces refered above will be added to module flink-table-api-java. And then when we want to support other dialects,  we need to include this module and implement ParserFactory DalectFactory to create a Parser and ExtendedOperationExecutor(optional) for the specific dialect.

...

Code Block
languagejava
public interface MySQLParserFactoryMySQLDialectFactory extends ParserFactoryDialectFactory {
  
    @Override
    public String factoryIdentifier() {
        return "mysql";
    }
   
    Parser create(Context context) {
       return new MySqlParser(context);
    }
}

...

Code Block
languagejava
public class MySQLParser implements Parser {
    private OperationTreeBuilder operationTreeBuilder;

    public MySQLParser(Context context) {
        this.operationTreeBuilder = context.getOperationTreeBuilder;
    }
    
    @Override
    List<Operation> parse(String statement) {
      // parse it to AST(Abstract Semantic Tree)
      MySQLAST mysqlAST = parseStament(statement);
      // convert the AST to Flink OperationTree
      List<Operation> operations = convertToOperation(mysqlAST);
   }
   
   private List<Operation> convertToOperation(MySQLAST mysqlAST) {
      // may look like
      Operation operation = operationTreeBuilder.project(Arrays.asList(Expressions.$("f0"), operationContext.scan("t1"
        new SourceQueryOperation(xx));
      return Collections.singletonList(operation)
   }
   
   @Override
   UnresolvedIdentifier parseIdentifier(String identifier) {
    // may need to identifier `db.t` to array of [db, t]
    string[] names = parseMySQLIdentifier(identifier);
    return UnresolvedIdentifier.of(names);
  }

  @Override
  ResolvedExpression parseSqlExpression(
            String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) {
   // parse sqlExpression to ResolvedExpression
  }

  @Override
  String[] getCompletionHints(String statement, int position) {
   // just for example, return empty string array directly
   return new String[0];
  }
}

Then,  specific the class path of MySQLParserFactory MySQLDialectFactory in the reosurce file org.apache.flink.table.factories.Factory to make it can be discovered by Java SPI mechanism. 

...