Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

default_database will be default to username db instance's default db if it's not provided. E.g. in postgres, it's the username

A Flink client can have multiple `JDBCCatalog`s to connect to different db instances.

...

Code Block
languagejava
interface JDBCDialect {
	// new APIs
	String getListTablesStatement(String database);
	
	String getListDatabasesStatement();

	String getTableMetadataStatement();
}

public JDBCCatalog extends AbstractCatalog {
	

	public JDBCCatalog(String catalogName, String defaultDatabase, String userName, String pwd, String baseUrl) 

	void open() {
		// verify jdbc connection to fail earlier
	}

	void close();
}
Code Block
languagejava

public PostgresJDBCCatalog extends AbstractJDBCCatalog {
	PostgresJDBCCatalog	PostgresDialect() {
		// new impls

		String getListTablesStatement(String database) { ... }
	
		String getListDatabasesStatement() { ... }

		String getTableMetadataStatement() { ... }
	}

	DerbyDialect {
		String getListTablesStatement(String database) { // unsupported exception }
	
		String getListDatabasesStatement() { // unsupported exception }

		String getTableMetadataStatement() { // unsupported exception }
	}

	...
}



public JDBCCatalog extends AbstractCatalog {
	JDBCDialect dialect;
	InternalJDBCCatalog internal;
	
	public JDBCCatalog(String catalogName, String defaultDatabase, String userName, String pwd, String baseUrl) {
		JDBCDialect d = JDBCDialects.get(baseUrl).get();

		switch(d) {
			case postgres:
				internal = new PostgresJDBCCatalog(...);
				break;
			case derby:
				...
		}
	} 

	List<String>void listDatabasesopen() {
		// translate to "SELECT datname FROM pg_database;"verify jdbc connection to fail earlier
	}

	void close();

	List<String> listDatabases() {
		// execute and parse dialect.getListDatabasesStatement();
	}

	boolean databaseExists(String dbname) {
		return listDatabases.contains(dbname);
	}

	CatalogTableboolean getTabletableExists(ObjectPath oppath) {
		// translate to result set metadata of "SELECT * FROM <tablename>;"databaseExists(path.getDatabaseName())
		return listTables().contains(path.getObjectName());
	}

	List<String> listDatabases() {
		return internal.listDatabases();
	}

	CatalogTable getTable(ObjectPath op) {
		return internal.getTable(op);
	}

	List<String> listTables(String db) {
		for each db in listDatabases:
			// translate to "SELECT * FROM information_schema.tables
						WHERE table_type = 'BASE TABLE' AND table_schema = <schema_name>
						ORDER BY table_type, table_name;"return internal.listTables(db);
	}
}

public InternalPostgresJDBCCatalog extends InternalJDBCCatalog {
	JDBCDialect dialect;

	protected PostgresJDBCCatalog(String catalogName, String defaultDatabase, String userName, String pwd, String baseUrl) {
		dialect = new PostgresDialect();
	}

	List<String> listDatabases() {
		// execute and parse dialect.getListDatabasesStatement();
	}

	booleanCatalogTable tableExistsgetTable(ObjectPath pathop) {
		databaseExists(path.getDatabaseName())
		return listTables().contains(path.getObjectName())// execute and parse dialect.getTableMetadataStatement();
		// map data types and build catalog table
	}

	List<String> listTables() {
		// execute and parse dialect.getListTablesStatement();
	}
}


The base url from constructor should be without databases, like "jdbc:postgresql://localhost:5432/" or "jdbc:postgresql://localhost:5432"

...

Code Block
catalogs:
	- name: ...
	  type: jdbc
	  username: xxx
	  password: yyy
	  base_url: jdbc:... 
      default-database: ... # optional

Versions

flink-connector-postgres module will provide a fat jar including latest postgres jdbc driver and relying on the We reply on driver itself for backward compatibility. Users can also build jars with their own versions.

...

Based on the above two facts, we propose to match database name space between Flink and Postgres, and make Postgres's <schema.table> corresponding to Flink's table name, as shown below.

Flink Catalog Metaspace Structure

Postgres Metaspace Structure 

catalog name (defined in Flink only)

n/a

database name

database name

table name

<schema name.>table name


The full path of Postgres table in Flink would be "<catalog>.<db>.`<schema.table>`"  if schema is specified.

...