Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion threadhttps://lists.apache.org/thread/66g79w5zlod2ylyv8k065j57pjjmv1jo
Vote threadhttps://lists.apache.org/thread/qfmf3v6zwd7ttcnn4nz48xz489w7cn4c
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-26603

Release1.1718


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

We propose the following interfaces:

  • Parser
  • ExtendedOperationExecutor
  • ParserFactoryDialectFactory
  • CatalogRegistry
  • OperationContextOperationTreeBuilder

Parser

Code Block
languagejava
/** Provides methods for parsing SQL objects from a SQL string. */
@PublicEvolving
public interface Parser {
	// the interface has existed in current codebase, here we just expose it.
}

...

ExtendedOperationExecutor

Code Block
languagejava
/**
 * An extended operation executor which provides method for executing operation. External pluggable
 * dialect can implement this interface to execute operation in its own way instead of using Flink's
 * own implementation for operation execution.
 */
@PublicEvolving
public interface ExtendedOperationExecutor {

    /**
     * Execute the given operation and return the execution result. This method will delegate
     * Flink's own operation execution.
     *
     * <p>If return Optional.empty(), the operation will then fall to Flink's operation execution.
     */
    Optional<TableResult> executeOperation(Operation operation);
}


DialectFactory

Code Block
languagejava
/**
 * Factory that creates {@link Parser} and {@link ExtendedOperationExecutor}.
 *
 * <p>The {@link #factoryIdentifier()} is identified by matching it against {@link
 * TableConfigOptions#TABLE_SQL_DIALECT}.
 */
@PublicEvolving
public interface ParserFactoryDialectFactory extends Factory {

    /** Creates a new parser. */
    Parser create(Context context);

    /** Context provided when a parser is created. */
    @PublicEvolving
    interface Context {
        CatalogRegistry getCatalogRegistry(); // interfaces provided dealing with get catalog, qulify identifier, etc.

        OperationTreeBuilder getOperationTreeBuilder(); // interfaces provided to build Operation.
    }
}

...

Code Block
languagejava
/**
 * A catalog registry for dealing with catalogs
 */
@PublicEvolving
public interface CatalogRegistry {
    String getCurrentDatabase();

    String getCurrentCatalog();

    ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier);

    Optional<Catalog> getCatalog(String catalogName);

    Optional<CatalogBaseTable> getCatalogBaseTable(ObjectIdentifier objectIdentifier);

    boolean isTemporaryTable(ObjectIdentifier objectIdentifier); 

    Optional<CatalogPartition> getPartition(
            ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec);
}

OperationTreeBuilder

Code Block
languagejava
/**
 * A builder for building {@link org.apache.flink.table.operations.Operation}
 */
@PublicEvolving
public interface OperationTreeBuilder {
     
    public QueryOperation scan(String path);

    public QueryOperation project(List<Expression> projectList, QueryOperation child);

    public QueryOperation project(
            List<Expression> projectList, QueryOperation child, boolean explicitAlias);

    public QueryOperation project(
            List<Expression> projectList, QueryOperation child, List<OverWindow> overWindows)

    // omit other public interaces has implemented in OperationTreeBuilder
}

...

These public interfaces refered above will be added to module flink-table-api-java. And then when we want to support other dialects,  we need to include this module and implement ParserFactory DalectFactory to create a Parser and ExtendedOperationExecutor(optional) for the specific dialect.

...

Code Block
languagejava
public interfaceMySQLDialectFactory MySQLParserFactory extends ParserFactoryDialectFactory {
  
    @Override
    public String factoryIdentifier() {
        return "mysql";
    }
   
    Parser create(Context context) {
       return new MySqlParser(context);
    }
}

...

Code Block
languagejava
public class MySQLParser implements Parser {
    private OperationTreeBuilder operationTreeBuilder;

    public MySQLParser(Context context) {
        this.operationTreeBuilder = context.getOperationTreeBuilder;
    }
    
    @Override
    List<Operation> parse(String statement) {
      // parse it to AST(Abstract Semantic Tree)
      MySQLAST mysqlAST = parseStament(statement);
      // convert the AST to Flink OperationTree
      List<Operation> operations = convertToOperation(mysqlAST);
   }
   
   private List<Operation> convertToOperation(MySQLAST mysqlAST) {
      // may look like
      Operation operation = operationTreeBuilder.project(Arrays.asList(Expressions.$("f0"), operationContext.scan("t1"
        new SourceQueryOperation(xx));
      return Collections.singletonList(operation)
   }
   
   @Override
   UnresolvedIdentifier parseIdentifier(String identifier) {
    // may need to identifier `db.t` to array of [db, t]
    string[] names = parseMySQLIdentifier(identifier);
    return UnresolvedIdentifier.of(names);
  }

  @Override
  ResolvedExpression parseSqlExpression(
            String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) {
   // parse sqlExpression to ResolvedExpression
  }

  @Override
  String[] getCompletionHints(String statement, int position) {
   // just for example, return empty string array directly
   return new String[0];
  }
}

Then,  specific the class path of MySQLParserFactory MySQLDialectFactory in the reosurce file org.apache.flink.table.factories.Factory to make it can be discovered by Java SPI mechanism. 

...