...
draw.io Board Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
There are four 6 main classes in Flink Jdbc Driver: FlinkDriver, FlinkConnection, FlinkStatement, FlinkResultSet, FlinkDatabaseMetaData and FlinkResultSet FlinkResultSetMetaData which implement jdbc interface Driver, Connection, Statement, ResultSet, DatabaseMetaData and ResultResultSetMetaData.
- FlinkDriver parses gateway address from url, and creates FlinkConnection
- FlinkConnection creates Executor according to gateway address. When the Connections is closed, it can close the connection with gateway by Executor
- FlinkStatement can get Executor from FlinkConnection, and submit sql query to it. After query is executed, FlinkStatement can get StatementResult from Executor, and create FlinkResultSet
- FlinkResultSet is an iterator, it gets results from StatementResult and return them to users
- FlinkDatabaseMetaData provides meta data of catalogs, databases and tables
- FlinkResultSetMetaData provides meta data of ResultSet such as columns
The call relationship between them are as followed.
...
User Case
Flink Jdbc Driver project module will be packaged into an independent jar file such as flink-table-jdbc-driver-{version}.jar
, which will contains classes of flink jdbc driver and shaded flink classes such as data type. User only need to add dependecy jdbc dependency in pom.xml or add the jar in the classpath of external jdbc tools Jdbc Tools such as sqlite.
Code Block | ||
---|---|---|
| ||
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-jdbc-driver</artifactId> <version>${flink.version}</version> </dependency> |
...
- Driver Name: org.apache.flink.table.jdbc.FlinkDriver
- Flink Connection URL: Users can use default catalog and database directly, or set them in url. They Users can also set custom parameters in url to open the session in SqlGateway with key1=val1&key2=val2&...
- jdbc:flink://{sql-gateway.host}:{sql-gateway.port}?key1=val1&key2=val2
- jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/{catalog name}?key1=val1&key2=val2
- jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/{catalog name}/{database name}?key1=val1&key2=val2
- Currently SqlGateway does not support authentication,
user
andpassword
in connection are invalid. Use Flink Jdbc Driver in Java code
Code Block language java String driverName = "org.apache.flink.table.jdbc.FlinkDriver"; String url = "jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/catalog/database?key1=val1&key2=val2"; Class.forName(driverName); try (Connection connection = DriverManager.getConnection(url)) { try (Statement statement = connection.createStatement()) { try (ResultSet resultSet = statement.execute("SELECT * FROM {Your Table}")) { while (resultSet.hasNext()) { .... } } } }
- User can also add the
flink-table-jdbc-driver-{version}.jar
to the classpath of external jdbc tools.
...
We support to convert the following flink data type to sql data type in this FLIP, and more types can be supported as needed in the future.
Flink Data Type | Jdbc Sql Data Type |
CharType/VarCharType | String |
BooleanType | Boolean |
TinyIntType | Byte |
SmallIntType | Short |
IntType | Int |
BigIntType | Long |
FloatType | Float |
DoubleType | Double |
DecimalType | BigDecimal |
BinaryType/VarBinaryType | (byte[]) Bytes |
DateType | Date |
TimeType | Time |
TimestampType | Timestamp |
ArrayType | Array |
Public Interface
There are many functions methods in Jdbc Driver, while this FLIP only implement the basic functions methods first and more functions methods will be supported later. The mainly related interfaces are as follows.implemented later when they are needed.
- Methods in
FlinkDriver
Code Block | ||
---|---|---|
| ||
/* Jdbc Driver for flink sql gateway */ public class FlinkDriver implements Driver { /* Connect sql gateway with given url and open/create session with given priperties. */ @Override public Connection connect(String url, Properties info) throws SQLException; } |
...
- Methods in
FlinkResultSet
:FlinkResultSet
only supports to fetching data from iteratorStatementResult
, it supports getXXX methods and doesn't support deleting, updating or moving the cursor.
...
- Methods in
FlinkResultSetMetaData
:FlinkResultSetMetaData
only supports to getting column information according to column index or name.
Code Block | ||
---|---|---|
| ||
/* ResultSetMetaData in flink sql driver. */ public class FlinkResultSetMetaData implements ResultSetMetaData { /* Get column count in the result set. */ @Override public int getColumnCount() throws SQLException; /* If the column may be null. */ @Override public int isNullable(int column) throws SQLException; /* Get display size for the column. */ @Override public int getColumnDisplaySize(int column) throws SQLException; /* Get column name according to column index. */ @Override public String getColumnLabel(int column) throws SQLException; public String getColumnName(int column) throws SQLException; /* Get precision for the column index. */ @Override public int getPrecision(int column) throws SQLException; /* Get column type id for the column index. */ @Override public int getColumnType(int column) throws SQLException; /* Get column type name for the column index. */ @Override public String getColumnTypeName(int column) throws SQLException; /* Get column type class name for the column index. */ @Override public String getColumnClassName(int column) throws SQLException; } |
Unsupported
...
Features
- Don't support transaction such as commit, rollback
- Don't support prepare statement, prepare call and etc operations
- Don't support management operations such as savepoint and etc
...