Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Parser
  • ParserFactory
  • CatalogRegistry
  • OperationContext

Parser

Code Block
languagejava
/** Provides methods for parsing SQL objects from a SQL string. */
@PublicEvolving
public interface Parser {
	// the interface has existed in current codebase, here we just expose it.
}

ParserFactory

Code Block
languagejava
/**
 * Factory that creates {@link Parser}.
 *
 * <p>The {@link #factoryIdentifier()} is identified by matching it against {@link
 * TableConfigOptions#TABLE_SQL_DIALECT}.
 */
@PublicEvolving
public interface ParserFactory extends Factory {

    /** Creates a new parser. */
    Parser create(Context context);

    /** Context provided when a parser is created. */
    @PublicEvolving
    interface Context {
        CatalogRegistry getCatalogRegistry(); // interfaces provided dealing with get catalog, qulify identifier, etc.

        OperationContext getOperationContext(); // interfaces provided to build Operation.
    }
}

CatalogRegistry

Code Block
languagejava
/**
 * A catalog registry for dealing with catalogs
 */
@PublicEvolving
public interface CatalogRegistry {
    String getCurrentDatabase();

    String getCurrentCatalog();

    ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier);

    Optional<Catalog> getCatalog(String catalogName);
}

OperationContext

Code Block
languagejava
/**
 * A context for building {@link org.apache.flink.table.operations.Operation}
 */
@PublicEvolving
public interface OperationContext {
     
    public QueryOperation scan(String path);

    public QueryOperation project(List<Expression> projectList, QueryOperation child);

    public QueryOperation project(
            List<Expression> projectList, QueryOperation child, boolean explicitAlias);

    public QueryOperation project(
            List<Expression> projectList, QueryOperation child, List<OverWindow> overWindows)

    // omit other public interaces has implemented in OperationTreeBuilder
}

...

In here, I would like to give an example for support MySQL dialect.

MySQLParserFactory

Code Block
languagejava
public interface MySQLParserFactory extends ParserFactory {
  
    @Override
    public String factoryIdentifier() {
        return "mysql";
    }
   
    Parser create(Context context) {
       return new MySqlParser(Context);
    }
}


MySQLParser

Code Block
languagejava
public interfaceclass MySQLParser implements Parser {
    private OperationContext operationContext;

    public MySQLParser(OperationContext operation) {
        this.operation = operation;
    }
    
    @Override
    List<Operation> parse(String statement) {
      List<Operation>// parse(String statement) {
    
    }
    it to AST(Abstract Semantic Tree)
      MySQLAST mysqlAST = parseStament(statement);
      // convert the AST to Flink OperationTree
      List<Operation> operations = convertToOperation(mysqlAST);
   }
   
   private List<Operation> convertToOperation(MySQLAST mysqlAST) {
      // may look likes 
      Operation operation = operationContext.project(Arrays.asList(Expressions.$("f0"), operationContext.scan("t1"));
      return Collections.singletonList(operation)
   }
   
   @Override
   UnresolvedIdentifier parseIdentifier(String identifier) {
    // may need to identifier `db.t` to array of // 
   [db, t]
    string[] names = parseMySQLIdentifier(identifier);
    return UnresolvedIdentifier.of(names);
  }

  @Override
  ResolvedExpression parseSqlExpression(
            String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) {
   // parse sqlExpression to ResolvedExpression
  }

  @Override
  String[] getCompletionHints(String statement, int position) {
   // just for example, return empty string array directly
   return new String[0];
  }
}

Then,  specific the class path of MySQLParserFactory in the reosurce file org.apache.flink.table.factories.Factory to make it can be discovered by Java SPI mechanism. 

...