THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
classes will be all in flink-jdbc module.
Code Block | ||
---|---|---|
| ||
interface JDBCDialect { // new APIs String getListTablesStatement(String database); String getListDatabasesStatement(); String getTableMetadataStatement(); PostgresDialect() { // new impls String getListTablesStatement(String database) { ... } String getListDatabasesStatement() { ... } String getTableMetadataStatement() { ... } } DerbyDialect { String getListTablesStatement(String database) { // unsupported exception } String getListDatabasesStatement() { // unsupported exception } String getTableMetadataStatement() { // unsupported exception } } ... } public JDBCCatalog extends AbstractCatalog { JDBCDialect dialect; InternalJDBCCatalogCatalog internal; public JDBCCatalog(String catalogName, String defaultDatabase, String userName, String pwd, String baseUrl) { JDBCDialect d = JDBCDialects.get(baseUrl).get(); switch(d) { case postgres: internal = new PostgresJDBCCatalog(...); break; case derby: ... } } void open() { // verify jdbc connection to fail earlier } void close(); List<String>Optional<TableFactory> listDatabasesgetTableFactory() { // execute and parse dialect.getListDatabasesStatementreturn Optional.of(new JDBCTableSourceSinkFactory()); } List<String> listDatabases() { return internal.listDatabases(); } boolean databaseExists(String dbname) { return listDatabases.contains(dbname); } boolean tableExists(ObjectPath path) { databaseExists(path.getDatabaseName()) return listTables().contains(path.getObjectName()); } List<String> listDatabases() { return internal.listDatabases(); } CatalogTable getTable(ObjectPath op) { return internal.getTable(op); } List<String> listTables(String db) { return internal.listTables(db); } } public InternalPostgresJDBCCatalogPostgresJDBCCatalog extends InternalJDBCCatalogJDBCCatalog { JDBCDialect dialect; // cannot be instantiated by users protected PostgresJDBCCatalog(String catalogName, String defaultDatabase, String userName, String pwd, String baseUrl) { dialect = new PostgresDialect(); } List<String> listDatabases() { // execute and parse dialect.getListDatabasesStatement();result } CatalogTable getTable(ObjectPath op) { // execute and parse dialect.getTableMetadataStatement();result // map data types and build catalog table } List<String> listTables() { // execute and parse dialect.getListTablesStatement();result } } |
The base url from constructor should be without databases, like "jdbc:postgresql://localhost:5432/" or "jdbc:postgresql://localhost:5432"
...