...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
We propose the following interfaces:
- Parser
- ExtendedOperationExecutor
- ParserFactoryDialectFactory
- CatalogRegistry
- OperationContextOperationTreeBuilder
Parser
Code Block |
---|
|
/** Provides methods for parsing SQL objects from a SQL string. */
@PublicEvolving
public interface Parser {
// the interface has existed in current codebase, here we just expose it.
} |
...
ExtendedOperationExecutor
Code Block |
---|
|
/**
* An extended operation executor which provides method for executing operation. External pluggable
* dialect can implement this interface to execute operation in its own way instead of using Flink's
* own implementation for operation execution.
*/
@PublicEvolving
public interface ExtendedOperationExecutor {
/**
* Execute the given operation and return the execution result. This method will delegate
* Flink's own operation execution.
*
* <p>If return Optional.empty(), the operation will then fall to Flink's operation execution.
*/
Optional<TableResult> executeOperation(Operation operation);
} |
DialectFactory
Code Block |
---|
|
/**
* Factory that creates {@link Parser} and {@link ExtendedOperationExecutor}.
*
* <p>The {@link #factoryIdentifier()} is identified by matching it against {@link
* TableConfigOptions#TABLE_SQL_DIALECT}.
*/
@PublicEvolving
public interface ParserFactoryDialectFactory extends Factory {
/** Creates a new parser. */
Parser create(Context context);
/** Context provided when a parser is created. */
@PublicEvolving
interface Context {
CatalogRegistry getCatalogRegistry(); // interfaces provided dealing with get catalog, qulify identifier, etc.
OperationTreeBuilder getOperationTreeBuilder(); // interfaces provided to build Operation.
}
} |
...
Code Block |
---|
|
/**
* A catalog registry for dealing with catalogs
*/
@PublicEvolving
public interface CatalogRegistry {
String getCurrentDatabase();
String getCurrentCatalog();
ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier);
Optional<Catalog> getCatalog(String catalogName);
Optional<CatalogBaseTable> getCatalogBaseTable(ObjectIdentifier objectIdentifier);
boolean isTemporaryTable(ObjectIdentifier objectIdentifier);
Optional<CatalogPartition> getPartition(
ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec);
} |
OperationTreeBuilder
Code Block |
---|
|
/**
* A builder for building {@link org.apache.flink.table.operations.Operation}
*/
@PublicEvolving
public interface OperationTreeBuilder {
public QueryOperation scan(String path);
public QueryOperation project(List<Expression> projectList, QueryOperation child);
public QueryOperation project(
List<Expression> projectList, QueryOperation child, boolean explicitAlias);
public QueryOperation project(
List<Expression> projectList, QueryOperation child, List<OverWindow> overWindows)
// omit other public interaces has implemented in OperationTreeBuilder
} |
...
These public interfaces refered above will be added to module flink-table-api-java. And then when we want to support other dialects, we need to include this module and implement ParserFactory DalectFactory to create a Parser and ExtendedOperationExecutor(optional) for the specific dialect.
...
Code Block |
---|
|
public interfaceMySQLDialectFactory MySQLParserFactory extends ParserFactoryDialectFactory {
@Override
public String factoryIdentifier() {
return "mysql";
}
Parser create(Context context) {
return new MySqlParser(context);
}
} |
...
Code Block |
---|
|
public class MySQLParser implements Parser {
private OperationTreeBuilder operationTreeBuilder;
public MySQLParser(Context context) {
this.operationTreeBuilder = context.getOperationTreeBuilder;
}
@Override
List<Operation> parse(String statement) {
// parse it to AST(Abstract Semantic Tree)
MySQLAST mysqlAST = parseStament(statement);
// convert the AST to Flink OperationTree
List<Operation> operations = convertToOperation(mysqlAST);
}
private List<Operation> convertToOperation(MySQLAST mysqlAST) {
// may look like
Operation operation = operationTreeBuilder.project(Arrays.asList(Expressions.$("f0"), operationContext.scan("t1"
new SourceQueryOperation(xx));
return Collections.singletonList(operation)
}
@Override
UnresolvedIdentifier parseIdentifier(String identifier) {
// may need to identifier `db.t` to array of [db, t]
string[] names = parseMySQLIdentifier(identifier);
return UnresolvedIdentifier.of(names);
}
@Override
ResolvedExpression parseSqlExpression(
String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) {
// parse sqlExpression to ResolvedExpression
}
@Override
String[] getCompletionHints(String statement, int position) {
// just for example, return empty string array directly
return new String[0];
}
} |
Then, specific the class path of MySQLParserFactory MySQLDialectFactory in the reosurce file org.apache.flink.table.factories.Factory to make it can be discovered by Java SPI mechanism.
...