...
In our production, we create Flink Session Cluster and users can perform OLAP query in it. Users like to use Jdbc Driver in their applications to submit queries and get results. Currently we use the old flink-sql-gateway[1] and flink-jdbc-driver[2], but when we try to upgrade our Flink Session Cluster, We meet compatibility problem. Flink Sql Gateway has been a submodule in Flink, and the API has been greatly optimized. The old Flink JDBC Driver cannot connect to the new Gateway directly which cause we cannot upgrade our Flink version. In this FLIP, I'd like to introduce Flink Jdbc Driver module in Flink to connect Gateway, users can use Jdbc Driver to submit their queries and get results like a database in their applications.
Proposal
Architecture
Currently users can use Flink Sql Client to connect Sql Gateway as followed
...
draw.io Board Diagram | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
User Case
Flink Jdbc Driver project will be packaged into an independent jar file such as flink-table-jdbc-driver-{version}.jar
, which will contains classes of flink jdbc driver and shaded flink classes such as data type. User only need to add dependecy in pom.xml or add the jar in the classpath of external jdbc tools such as sqlite.
Code Block | ||
---|---|---|
| ||
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-jdbc-driver</artifactId>
<version>${flink.version}</version>
</dependency> |
More information about Flink Jdbc Driver
- Driver Name: org.apache.flink.table.jdbc.FlinkDriver
- Flink Connection URL: Users can use default catalog and database directly, or set them in url. They can set custom parameters in url to open the session in SqlGateway with key1=val1&key2=val2&...
- jdbc:flink://{sql-gateway.host}:{sql-gateway.port}?key1=val1&key2=val2
- jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/{catalog name}?key1=val1&key2=val2
- jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/{catalog name}/{database name}?key1=val1&key2=val2
Use Flink Jdbc Driver in Java code
Code Block language java String driverName = "org.apache.flink.table.jdbc.FlinkDriver"; String url = "jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/catalog/database?key1=val1&key2=val2"; Class.forName(driverName); try (Connection connection = DriverManager.getConnection(url)) { try (Statement statement = connection.createStatement()) { try (ResultSet resultSet = statement.execute("SELECT * FROM {Your Table}")) { while (resultSet.hasNext()) { .... } } } }
- User can also add the
flink-table-jdbc-driver-{version}.jar
to the classpath of external jdbc tools.
Data Types
We support to convert the following flink data type to sql data type in this FLIP, and more types can be supported as needed in the future.
Flink Data Type | Jdbc Sql Data Type |
CharType/VarCharType | String |
BooleanType | Boolean |
TinyIntType | Byte |
SmallIntType | Short |
IntType | Int |
BigIntType | Long |
FloatType | Float |
DoubleType | Double |
DecimalType | BigDecimal |
BinaryType/VarBinaryType | (byte[]) Bytes |
DateType | Date |
TimeType | Time |
TimestampType | Timestamp |
ArrayType | Array |
Public Interface
There are many functions in Jdbc Driver, while this FLIP only implement the basic functions first and more functions will be supported later. The mainly related interfaces are as follows.
- Methods in
FlinkDriver
Code Block | ||
---|---|---|
| ||
/* Jdbc Driver for flink sql gateway */
public class FlinkDriver implements Driver {
/* Connect sql gateway with given url and open/create session with given priperties. */
@Override
public Connection connect(String url, Properties info) throws SQLException;
} |
- Methods in
FlinkConnection
Code Block | ||
---|---|---|
| ||
/* Connection to flink sql gateway for jdbc driver. */
public class FlinkConnection implements Connection {
/* Create statement from connection. */
@Override
public Statement createStatement() throws SQLException;
/* Close session in sql gateway. */
@Override
public void close() throws SQLException;
/* Use given catalog to the session in sql gateway. */
@Override
public void setCatalog(String catalog) throws SQLException;
/* Get current catalog name from session. */
@Override
public String getCatalog() throws SQLException;
/* Get FlinkDatabaseMetaData instance for the current catalog. */
@Override
public DatabaseMetaData getMetaData() throws SQLException;
/* Use given database to the session in sql gateway. */
@Override
public void setSchema(String schema) throws SQLException;
/* Get current database name from session. */
@Override
public String getSchema() throws SQLException;
} |
- Methods in
FlinkStatement
Code Block | ||
---|---|---|
| ||
/* Statement in flink jdbc driver. */
public class FlinkStatement implements Statement {
/* Submit sql to sql gateway and get result set. */
@Override
public ResultSet executeQuery(String sql) throws SQLException;
/* Execute given update sql and return result count. */
@Override
public int executeUpdate(String sql) throws SQLException;
/* Cancel the running job in sql gateway. */
@Override
public void close() throws SQLException;
/* Return true if the result set has more results. */
@Override
public boolean getMoreResults() throws SQLException;
/* Get current result set in the statement. */
@Override
public ResultSet getResultSet() throws SQLException;
} |
- Methods in
FlinkResultSet
:FlinkResultSet
only supports to fetching data from iteratorStatementResult
, it supports getXXX methods and doesn't support deleting, updating or moving the cursor.
Code Block | ||
---|---|---|
| ||
/* ResultSet for flink jdbc driver. */
public class FlinkResultSet implements ResultSet {
/* Return true if there are more resuts in result iterator. */
@Override
public boolean next() throws SQLException;
/* Close the fetch result operation. */
@Override
public void close() throws SQLException;
/* Get different values according to data type and column index. */
@Override
public <V> V getXXX(int columnIndex) throws SQLException.
/* Get different values according to data type and column name. */
@Override
public <V> V getXXX(String columnName) throws SQLException.
} |
- Methods in
FlinkDatabaseMetaData
:FlinkDatabaseMetaData
only supports TABLE and VIEW tables, getting information of catalog, database and tables.
Code Block | ||
---|---|---|
| ||
/* DatabaseMetaData in flink sql driver. */
public class FlinkDatabaseMetaData implements DatabaseMetaData {
/* Get the url of flink sql driver. */
@Override
public String getURL() throws SQLException;
/* Get catalog name list from session. */
@Override
public ResultSet getCatalogs() throws SQLException;
/* Get database name list from session. */
@Override
public ResultSet getSchemas() throws SQLException;
/* Get database name lins in given catalog from session. */
@Override
public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException;
/* Get table name list with given condition from session. */
@Override
public ResultSet getTables(String catalog, String schemaPattern, String tableNamePattern, String[] types) throws SQLException;
/* Get column list with given condition from session. */
@Override
public ResultSet getColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) throws SQLException;
/* Get primary key list for given table from session. */
@Override
public ResultSet getPrimaryKeys(String catalog, String schema, String table) throws SQLException;
} |
- Methods in
FlinkResultSetMetaData
:FlinkResultSetMetaData
only supports to getting column information according to column index or name.
Code Block | ||
---|---|---|
| ||
/* ResultSetMetaData in flink sql driver. */
public class FlinkResultSetMetaData implements ResultSetMetaData {
/* Get column count in the result set. */
@Override
public int getColumnCount() throws SQLException;
/* If the column may be null. */
@Override
public int isNullable(int column) throws SQLException;
/* Get display size for the column. */
@Override
public int getColumnDisplaySize(int column) throws SQLException;
/* Get column name according to column index. */
@Override
public String getColumnLabel(int column) throws SQLException;
public String getColumnName(int column) throws SQLException;
/* Get precision for the column index. */
@Override
public int getPrecision(int column) throws SQLException;
/* Get column type id for the column index. */
@Override
public int getColumnType(int column) throws SQLException;
/* Get column type name for the column index. */
@Override
public String getColumnTypeName(int column) throws SQLException;
/* Get column type class name for the column index. */
@Override
public String getColumnClassName(int column) throws SQLException;
} |
Unsupported Ability
- Don't support transaction such as commit, rollback
- Don't support prepare statement, prepare call and etc operations
- Don't support management operations such as savepoint and etc
[1] https://github.com/ververica/flink-sql-gateway
...