...
default_database will be default to username db instance's default db if it's not provided. E.g. in postgres, it's the username
A Flink client can have multiple `JDBCCatalog`s to connect to different db instances.
...
Code Block | ||
---|---|---|
| ||
interface JDBCDialect { // new APIs String getListTablesStatement(String database); String getListDatabasesStatement(); String getTableMetadataStatement(); } public JDBCCatalog extends AbstractCatalog { public JDBCCatalog(String catalogName, String defaultDatabase, String userName, String pwd, String baseUrl) void open() { // verify jdbc connection to fail earlier } void close(); } | ||
Code Block | ||
| ||
public PostgresJDBCCatalog extends AbstractJDBCCatalog { PostgresJDBCCatalog PostgresDialect() { // new impls String getListTablesStatement(String database) { ... } String getListDatabasesStatement() { ... } String getTableMetadataStatement() { ... } } DerbyDialect { String getListTablesStatement(String database) { // unsupported exception } String getListDatabasesStatement() { // unsupported exception } String getTableMetadataStatement() { // unsupported exception } } ... } public JDBCCatalog extends AbstractCatalog { JDBCDialect dialect; InternalJDBCCatalog internal; public JDBCCatalog(String catalogName, String defaultDatabase, String userName, String pwd, String baseUrl) { JDBCDialect d = JDBCDialects.get(baseUrl).get(); switch(d) { case postgres: internal = new PostgresJDBCCatalog(...); break; case derby: ... } } List<String>void listDatabasesopen() { // translate to "SELECT datname FROM pg_database;"verify jdbc connection to fail earlier } void close(); List<String> listDatabases() { // execute and parse dialect.getListDatabasesStatement(); } boolean databaseExists(String dbname) { return listDatabases.contains(dbname); } CatalogTableboolean getTabletableExists(ObjectPath oppath) { // translate to result set metadata of "SELECT * FROM <tablename>;"databaseExists(path.getDatabaseName()) return listTables().contains(path.getObjectName()); } List<String> listDatabases() { return internal.listDatabases(); } CatalogTable getTable(ObjectPath op) { return internal.getTable(op); } List<String> listTables(String db) { for each db in listDatabases: // translate to "SELECT * FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_schema = <schema_name> ORDER BY table_type, table_name;"return internal.listTables(db); } } public InternalPostgresJDBCCatalog extends InternalJDBCCatalog { JDBCDialect dialect; protected PostgresJDBCCatalog(String catalogName, String defaultDatabase, String userName, String pwd, String baseUrl) { dialect = new PostgresDialect(); } List<String> listDatabases() { // execute and parse dialect.getListDatabasesStatement(); } booleanCatalogTable tableExistsgetTable(ObjectPath pathop) { databaseExists(path.getDatabaseName()) return listTables().contains(path.getObjectName())// execute and parse dialect.getTableMetadataStatement(); // map data types and build catalog table } List<String> listTables() { // execute and parse dialect.getListTablesStatement(); } } |
The base url from constructor should be without databases, like "jdbc:postgresql://localhost:5432/" or "jdbc:postgresql://localhost:5432"
...
Code Block |
---|
catalogs: - name: ... type: jdbc username: xxx password: yyy base_url: jdbc:... default-database: ... # optional |
Versions
flink-connector-postgres module will provide a fat jar including latest postgres jdbc driver and relying on the We reply on driver itself for backward compatibility. Users can also build jars with their own versions.
...
Based on the above two facts, we propose to match database name space between Flink and Postgres, and make Postgres's <schema.table> corresponding to Flink's table name, as shown below.
Flink Catalog Metaspace Structure | Postgres Metaspace Structure |
catalog name (defined in Flink only) | n/a |
database name | database name |
table name | <schema name.>table name |
The full path of Postgres table in Flink would be "<catalog>.<db>.`<schema.table>`" if schema is specified.
...