...
Proposal
We propose to add an `AbstractJDBCCatalog` interface a `JDBCCatalog` user-face catalog and a `PostgresJDBCCatalog` implementation. With such a fundamental work, implementations for other relational db can be added easily later.
Design
`PostgresJDBCCatalog` `JDBCCatalog` will be a read only catalog that supports the following operations on db and table:
...
Users can only read these objects from databases, but not able to manipulate them yet in this FLIP.
Metaspace Mapping
Postgres has an additional name space as `schema` besides database. A pg instance can have multiple dbs, each db can have multiple schemas with a default one named "postgres", each schema can have multiple tables.
A few facts to know about how jdbc works in Postgres
1) jdbc connection to Postgres have to be for a specific database without schema name. If there's no db specified in the url, the default db is the username.
2) when query a table in Postgres, users can use either <schema.table> or just <table>. The schema is optional and defaults to "postgres"
Based on the above two facts, we propose to match database name space between Flink and Postgres, and make Postgres's <schema.table> corresponding to Flink's table name, as shown below.
...
Flink Catalog Metaspace Structure
...
Postgres Metaspace Structure
...
catalog name (defined in Flink only)
...
n/a
...
database name
...
database name
...
table name
...
<schema name.>table name
The full path of Postgres table in Flink would be "<catalog>.<db>.`<schema.table>`" if schema is specified.
Configurations
To configure a `PostgresJDBCCatalog``JDBCCatalog`, users only need to provide
- base jdbc url
- username
- password
Database default_database will be default to username if it's not provided.
A Flink client can have multiple `PostgresJDBCCatalog`s `JDBCCatalog`s to connect to different pg db instances.
Interfaces, Classes, and Modules
classes will be all in flink-jdbc module.
Code Block | ||
---|---|---|
| ||
// flink-jdbc module public AbstractJDBCCataloginterface JDBCDialect { String getListTablesStatement(String database); String getListDatabasesStatement(); String getTableMetadataStatement(); } public JDBCCatalog extends AbstractCatalog { public AbstractJDBCCatalogJDBCCatalog(String catalogName, String defaultDatabase, String userName, String pwd, String baseUrl) void open() { // verify jdbc connection to fail earlier } void close(); } |
Code Block | ||
---|---|---|
| ||
// a new flink-connector-postgres module
public PostgresJDBCCatalog extends AbstractJDBCCatalog {
PostgresJDBCCatalog(String catalogName, String defaultDatabase, String userName, String pwd, String baseUrl) {
}
List<String> listDatabases() {
// translate to "SELECT datname FROM pg_database;"
}
boolean databaseExists(String dbname) {
return listDatabases.contains(dbname);
}
CatalogTable getTable(ObjectPath op) {
// translate to result set metadata of "SELECT * FROM <tablename>;"
}
List<String> listTables() {
for each db in listDatabases:
// translate to "SELECT * FROM information_schema.tables
WHERE table_type = 'BASE TABLE' AND table_schema = <schema_name>
ORDER BY table_type, table_name;"
}
boolean tableExists(ObjectPath path) {
databaseExists(path.getDatabaseName())
return listTables().contains(path.getObjectName());
}
} |
...
The full url when connecting to the db will be with database name, like "jdbc:postgresql://localhost:5432/db"
Using the catalog
Table API
Code Block |
---|
// java
tEnv.registerCatalog(name, new JDBCCatalog(...)) |
SQL CLI yaml configs
Code Block |
---|
catalogs:
- name: ...
type: jdbc
username: xxx
password: yyy
base_url: jdbc:...
default-database: ... # optional |
Versions
flink-connector-postgres module will provide a fat jar including latest postgres jdbc driver and relying on the driver itself for backward compatibility. Users can also build jars with their own versions.
Flink-Postgres Metaspace Mapping
Postgres has an additional name space as `schema` besides database. A pg instance can have multiple dbs, each db can have multiple schemas with a default one named "postgres", each schema can have multiple tables.
A few facts to know about how jdbc works in Postgres
1) jdbc connection to Postgres have to be for a specific database without schema name. If there's no db specified in the url, the default db is the username.
2) when query a table in Postgres, users can use either <schema.table> or just <table>. The schema is optional and defaults to "postgres"
Based on the above two facts, we propose to match database name space between Flink and Postgres, and make Postgres's <schema.table> corresponding to Flink's table name, as shown below.
Flink Catalog Metaspace Structure | Postgres Metaspace Structure |
catalog name (defined in Flink only) | n/a |
database name | database name |
table name | <schema name.>table name |
The full path of Postgres table in Flink would be "<catalog>.<db>.`<schema.table>`" if schema is specified.
Flink-Postgres Data Type Mapping
Flink Type | Postgres Type |
smallint | smallint |
int | integer |
bigint | bigint |
float | real |
double | double precision |
decimal(p,s) | numeric(p,s) |
boolean | boolean |
timestamp | timestamp without timezone |
timestamp with timezone | timestamp with timezo (depending on pg server timezone) |
date | date |
time | time without timezone |
time with timezone | time with timezone |
interval | interval |
string | text |
varchar(n) | character varying |
char | char, character |
binary/varbinary/bytes | bytea |
array | [] (array of all the above primitive types) |
...
*timestamp/time precision is 0-6
Using the catalog
In Program
Code Block |
---|
// java
tEnv.registerCatalog(name, new PostgresJDBCCatalog(...)) |
Yaml configs for SQL CLI
Code Block |
---|
catalogs:
- name: ...
type: postgres
username: xxx
password: yyy
base_url: jdbc:...
default-database: ... # optional |
Versions
flink-connector-postgres module will provide a fat jar including latest postgres jdbc driver and relying on the driver itself for backward compatibility. Users can also build jars with their own versions.
Rejected Alternatives
n/a