Discussion thread | - | ||||||||
---|---|---|---|---|---|---|---|---|---|
Vote thread | - | ||||||||
JIRA | - | ||||||||
https://lists.apache.org/thread/d1owrg8zh77v0xygcpb93fxt0jpjdkb3 | |||||||||
Vote thread | https://lists.apache.org/thread/7jbmg22lnww31sbfdzztwrzgm6bkhjrj | ||||||||
JIRA |
| ||||||||
Release | 1.18.0 | Release | - |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
The old repositories flink-sql-gateway[1] and flink-jdbc-driver[2] support submitting query to the older version of flink cluster version. But for the latest flink version cluster there are many compatibility problems. Flink Sql Gateway has been a submodule in Flink, and the API has been greatly optimized. The old Flink JDBC Driver cannot connect to the new Gateway directly which will cause users cannot upgrade their Flink Clusters. In this FLIP, we'd like to introduce Flink Jdbc Driver module in Flink to connect the new Gateway, users can use Jdbc Driver to submit their queries and get results like a database in their applications.
...
draw.io Board Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
There are 6 7 main classes in Flink Jdbc Driver: FlinkDriver, FlinkDataSource, FlinkConnection, FlinkStatement, FlinkResultSet, FlinkDatabaseMetaData and FlinkResultSetMetaData which implement jdbc interface Driver, DataSource, Connection, Statement, ResultSet, DatabaseMetaData and ResultSetMetaData.
- FlinkDriver parses gateway address from url, and creates FlinkConnection
- FlinkDataSource manages connection pool for flink jdbc, it will create specific count of connections and choose one for client directly.
- FlinkConnection creates Executor according to gateway address. When the Connections is closed, it can close the connection with gateway by Executor
- FlinkStatement can get Executor from FlinkConnection, and submit sql query to it. After query is executed, FlinkStatement can get StatementResult from Executor, and create FlinkResultSet
- FlinkResultSet is an iterator, it gets results from StatementResult and return them to users
- FlinkDatabaseMetaData provides meta data of catalogs, databases and tables
- FlinkResultSetMetaData provides meta data of ResultSet such as columns
...
draw.io Board Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
User Case
Flink Jdbc Driver module will be packaged into an independent jar file such as flink-table-jdbc-driver-{version}.jar
, which will contains classes of flink jdbc driver and shaded flink classes such as data type. User only need to add jdbc dependency in pom.xml or add the jar in the classpath of external Jdbc Tools such as sqlite.
Code Block | ||
---|---|---|
| ||
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-jdbc-driver</artifactId>
<version>${flink.version}</version>
</dependency> |
More information about Flink Jdbc Driver
|
There're Session
s and Operation
s in SqlGateway
. SqlGateway
will open a Session
for each FlinkConnection
, and then do multiple Operation
s in one Session
. When users create a FlinkConnection
by FlinkDriver
with SqlGateway
, it will open an exist or a new Session
. Any time users want to issue SQL statements to the database, they require a FlinkStatement
instance from FlinkConnection
. Once users have a FlinkStatement
, they can use issue a query. This will return a FlinkResultSet
instance, which contains the entire result. Each operation such as the execution query(Flink job), fetching results in FlinkResultSet
will be an Operation
in the Session
of SqlGateway
.
User Case
Flink Jdbc Driver module will be packaged into an independent jar file such as flink-table-jdbc-driver-{version}.jar
, which will contains classes of flink jdbc driver and shaded flink classes such as data type. User only need to add jdbc dependency in pom.xml or add the jar in the classpath of external Jdbc Tools such as sqlite.
Code Block | ||
---|---|---|
| ||
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-jdbc-driver</artifactId>
<version>${flink.version}</version>
</dependency> |
More information about Flink Jdbc Driver
- Driver Name: org.apache.flink.table.jdbc.FlinkDriver
- Flink Connection URL: Users can use default catalog and database directly, or set them in url. Users can also set custom parameters in url to open the session in SqlGateway with key1=val1&key2=val2&...
- jdbc
- Driver Name: org.apache.flink.table.jdbc.FlinkDriver
- Flink Connection URL: Users can use default catalog and database directly, or set them in url. Users can also set custom parameters in url to open the session in SqlGateway with key1=val1&key2=val2&...
- jdbc:flink://{sql-gateway.host}:{sql-gateway.port}?key1=val1&key2=val2
- jdbc:flink://{sql-gateway.host}:{sql-gateway.port}?key1=val1&key2=val2
- jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/{catalog name}?key1=val1&key2=val2
- jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/{catalog name}/{database name}?key1=val1&key2=val2
- Currently SqlGateway does not support authentication,
user
andpassword
in connection are invalid. Use Flink Jdbc Driver in Java code
Code Block language java String driverName = "org.apache.flink.table.jdbc.FlinkDriver"; String url = "jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/{catalog name}/{database name}?key1=val1&key2=val2"; Class.forName(driverName); try (Connection connection = DriverManager.getConnection(url)) { try (Statement statement = connection.createStatement()) { try (ResultSet resultSet = statement.execute("SELECT * FROM {Your Table}")) { while (resultSet.hasNext()) { Do .your work ... } } try (ResultSet resultSet } }
- User can also add the
flink-table-jdbc-driver-{version}.jar
to the classpath of external jdbc tools.
Data Types
We support to convert the following flink data type to sql data type in this FLIP, and more types can be supported as needed in the future.
= statement.execute("SELECT * FROM T1 JOIN T2 on T1.id=T2.id ...")) { while (resultSet.hasNext()) { Do your work ... } } } }
- User can also add the
flink-table-jdbc-driver-{version}.jar
to the classpath of external jdbc tools.
Data Types
The following basic flink data types are supported to convert to sql data type in this FLIP, and more types can be supported as needed in the future.
Flink Data Type | Java Sql Data Type | Java Data Type |
CharType/VarCharType | CHAR/VARCHAR | String |
BooleanType | BOOLEAN | Boolean |
TinyIntType | TINYINT | Byte |
SmallIntType | SMALLINT | Short |
IntType | INTEGER | Int |
BigIntType | BIGINT | Long |
FloatType | FLOAT | Float |
DoubleType | DOUBLE | Double |
DecimalType | DECIMAL |
Flink Data Type
Jdbc Sql Data Type
CharType/VarCharType
String
BooleanType
Boolean
TinyIntType
Byte
SmallIntType
Short
IntType
Int
BigIntType
Long
FloatType
Float
DoubleType
Double
BigDecimal | |
BinaryType/VarBinaryType |
BINARY/VARBINARY | byte[] |
DateType | DATE | Date |
TimeType | TIME | Time |
TimestampType | TIMESTAMP | Timestamp |
ArrayType
Array
Public Interface
There are many methods in Jdbc Driver, while this FLIP only implement the basic methods first and more methods will be implemented later when they are needed.
- Methods in
FlinkDriver
Code Block | ||
---|---|---|
| ||
/* Jdbc Driver for flink sql gateway */
public class FlinkDriver implements Driver {
/* Connect sql gateway with given url and open/create session with given priperties. */
@Override
public Connection connect(String url, Properties info) throws SQLException;
} |
- Methods in
FlinkConnection
ZonedTimestampType | TIMESTAMP_WITH_TIMEZONE | OffsetDateTime |
LocalZonedTimestampType | TIMESTAMP_WITH_LOCAL_TIMEZONE | Timestamp |
ArrayType | ARRAY | Array |
RowType | ROW(Not in java.sql.Types) | Row(Flink Row Data) |
MapType | MAP | Map<K, V> |
Currently TIMESTAMP_WITH_LOCAL_TIMEZONE is not exist in java.sql.Types, but it is supported by Flink
. Users can define a field as (f TIMESTAMP(p) WITH LOCAL TIME ZONE)
and set time zone through the connection parameters or dynamic parameters in console by table.local-time-zone
. After that, users can get Timestamp
which will be automatically converted from stored time data into specific Timestamp
according to the given time zone.
Java Sql Interfaces
There are many methods in Jdbc Driver, while this FLIP only implement the basic methods first and more methods will be implemented later when they are needed.
- Methods in
FlinkDriver
Code Block | ||
---|---|---|
| ||
/* Jdbc Driver for flink sql gateway. Only Batch Mode queries are supported. If you force to submit streaming queries, you may get unrecognized updates, deletions and other results in FlinkResultSet. */
public class FlinkDriver implements Driver {
/* Connect sql gateway with given url and open/create session with given priperties. */
@Override
public Connection connect(String url, Properties info | ||
Code Block | ||
| ||
/* Connection to flink sql gateway for jdbc driver. */
public class FlinkConnection implements Connection {
/* Create statement from connection. */
@Override
public Statement createStatement() throws SQLException;
/* Close session in sql gateway. */
@Override
public void close() throws SQLException;
/* Use given catalog to the session in sql gateway. */
@Override
public void setCatalog(String catalog) throws SQLException;
/* Get current catalog name from session. */
@Override
public String getCatalog() throws SQLException;
/* Get FlinkDatabaseMetaData instance for the current catalog. */
@Override
public DatabaseMetaData getMetaData() throws SQLException;
/* Use given database to the session in sql gateway. */
@Override
public void setSchema(String schema) throws SQLException;
/* Get current database name from session. */
@Override
public String getSchema() throws SQLException;
} |
- Methods in
FlinkStatement
FlinkDataSource
Code Block | ||
---|---|---|
| ||
/* StatementJdbc inDataSource flinkmanages jdbcconnections driver. *for client, we can support more operations in it in the future. */ public class FlinkStatementFlinkDataSource implements StatementDataSource { /* SubmitThe max sqlcount toof sqlconnections gatewaywhich andthe getdata resultsource setholds. */ @Overrideprivate int maxActive; /* Set the url of connection. */ public synchronized ResultSetvoid executeQuerysetUrl(String sqlurl) throws SQLException; /* ExecuteSet giventhe updatedriver sqlclass andname returnfor resultthe countsource. */ @Overridepublic synchronized public int executeUpdatevoid setDriverClassName(String sqldriverClassName) throws; SQLException; /* CancelSet the max runningactive jobconnection infor sqlthe gatewaysource. */ @Override public publicsynchronized void closesetMaxActive() throws SQLExceptionint maxActive); /* ReturnGet truea ifconnection thefrom result set has more resultsdata source. */ @Override public booleanConnection getMoreResultsgetConnection() throws SQLException; @Override /*public GetConnection current result set in the statement. */ @Override public ResultSet getResultSet(getConnection(String username, String password) throws SQLException; } |
- Methods in
FlinkResultSet
:FlinkResultSet
only supports fetching data from iteratorStatementResult
, it supports getXXX methods and doesn't support deleting, updating or moving the cursor.FlinkConnection
Code Blockcode | ||
---|---|---|
| ||
/* ResultSetConnection forto flink sql gateway for jdbc driver. */ public class FlinkResultSetFlinkConnection implements ResultSetConnection { /* ReturnCreate truestatement if there are more resuts in result iteratorfrom connection. */ @Override public booleanStatement nextcreateStatement() throws SQLException; /* Close thesession fetchin resultsql operationgateway. */ @Override public void close() throws SQLException; /* GetUse differentgiven values accordingcatalog to datathe typesession andin columnsql indexgateway. */ @Override public <V>void V getXXXsetCatalog(intString columnIndexcatalog) throws SQLException.; /* Get differentcurrent valuescatalog accordingname to data type and column namefrom session. */ @Override public <V>String V getXXX(String columnNamegetCatalog() throws SQLException. } |
- Methods in
FlinkDatabaseMetaData
:FlinkDatabaseMetaData
only supports TABLE and VIEW tables, getting information of catalog, database and tables.
Code Block | ||
---|---|---|
| ||
/* DatabaseMetaData in flink sql driver. */ public class FlinkDatabaseMetaData implements DatabaseMetaData { /* Get the url of flink sql driver; /* Get FlinkDatabaseMetaData instance for the current catalog. */ @Override public StringDatabaseMetaData getURLgetMetaData() throws SQLException; /* GetUse cataloggiven namedatabase listto fromthe session in sql gateway. */ @Override public ResultSetvoid getCatalogssetSchema(String schema) throws SQLException; /* Get current database name list from session. */ @Override public ResultSetString getSchemasgetSchema() throws SQLException; } |
- Methods in
FlinkStatement
Code Block | ||
---|---|---|
| ||
/* Statement in flink jdbc driver. */ public class FlinkStatement implements Statement { /* Submit sql to sql gateway and get result set /* Get database name lins in given catalog from session. */ @Override public ResultSet getSchemasexecuteQuery(String catalog,sql) Stringthrows schemaPattern) throws SQLException; /* GetExecute tablegiven nameupdate listsql withand givenreturn conditionresult from sessioncount. */ @Override public ResultSetint getTablesexecuteUpdate(String catalog, String schemaPattern, String tableNamePattern, String[] typessql) throws SQLException; /* GetCancel columnthe listrunning withjob givenin conditionsql from sessiongateway. */ @Override public ResultSetvoid getColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePatternclose() throws SQLException; /* GetReturn primarytrue keyif listthe forresult givenset tablehas frommore sessionresults. */ @Override public ResultSetboolean getPrimaryKeys(String catalog, String schema, String tablegetMoreResults() throws SQLException; } |
- Methods in
FlinkResultSetMetaData
:FlinkResultSetMetaData
only supports getting column information according to column index or name.
Code Block | ||
---|---|---|
| ||
/* ResultSetMetaData in flink sql driver. */ public class FlinkResultSetMetaData implements ResultSetMetaData { /* Get current columnresult countset in the result setstatement. */ @Override public intResultSet getColumnCountgetResultSet() throws SQLException; } |
- Methods in
FlinkResultSet
:FlinkResultSet
only supports fetching data from iteratorStatementResult
, it supports getXXX methods and doesn't support deleting, updating or moving the cursor. Compare withResultSet
, there isgetKind
method inFlinkResultSet
to get theRowKind
of current record.
Code Block | ||
---|---|---|
| ||
/* ResultSet for flink jdbc driver. Only Batch Mode queries are supported. If you force to submit streaming queries, you may get unrecognized updates, deletions and other results. */ public class FlinkResultSet implements ResultSet { /* Return true if there are more resuts in result iterator /* If the column may be null. */ @Override public int isNullable(int column) throws SQLException; /* Get display size for the column. */ @Override public int getColumnDisplaySize(int column) throws SQLException; /* Get column name according to column index. */ @Override public String getColumnLabel(int column) throws SQLException; public String getColumnName(int columnboolean next() throws SQLException; /* GetClose precisionthe forfetch theresult column indexoperation. */ @Override public intvoid getPrecisionclose(int column) throws SQLException; /* Get different columnvalues according typeto iddata fortype theand column index. */ @Override public int<V> getColumnTypeV getXXX(int columncolumnIndex) throws SQLException;. /* Get different values columnaccording typeto namedata fortype theand column indexname. */ @Override public <V> StringV getColumnTypeNamegetXXX(intString columncolumnName) throws SQLException; /* Get column type class name for the. } |
- Methods in
FlinkDatabaseMetaData
:FlinkDatabaseMetaData
only supports TABLE and VIEW tables, getting information of catalog, database and tables.
Code Block | ||
---|---|---|
| ||
/* DatabaseMetaData in flink sql driver. */ public class FlinkDatabaseMetaData implements DatabaseMetaData { /* Get the url of flink sql driver column index. */ @Override public String getColumnClassName(int column) throws SQLException; } |
Unsupported Features
...
public String getURL() throws SQLException;
/* Get catalog name list from session. */
@Override
public ResultSet getCatalogs() throws SQLException;
/* Get database name list from session. */
@Override
public ResultSet getSchemas() throws SQLException;
/* Get database name lins in given catalog from session. */
@Override
public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException;
/* Get table name list with given condition from session. */
@Override
public ResultSet getTables(String catalog, String schemaPattern, String tableNamePattern, String[] types) throws SQLException;
/* Get column list with given condition from session. */
@Override
public ResultSet getColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) throws SQLException;
/* Get primary key list for given table from session. */
@Override
public ResultSet getPrimaryKeys(String catalog, String schema, String table) throws SQLException;
} |
- Methods in
FlinkResultSetMetaData
:FlinkResultSetMetaData
only supports getting column information according to column index or name.
Code Block | ||
---|---|---|
| ||
/* ResultSetMetaData in flink sql driver. */
public class FlinkResultSetMetaData implements ResultSetMetaData {
/* Get column count in the result set. */
@Override
public int getColumnCount() throws SQLException;
/* If the column may be null. */
@Override
public int isNullable(int column) throws SQLException;
/* Get display size for the column. */
@Override
public int getColumnDisplaySize(int column) throws SQLException;
/* Get column name according to column index. */
@Override
public String getColumnLabel(int column) throws SQLException;
public String getColumnName(int column) throws SQLException;
/* Get precision for the column index. */
@Override
public int getPrecision(int column) throws SQLException;
/* Get column type id for the column index. */
@Override
public int getColumnType(int column) throws SQLException;
/* Get column type name for the column index. */
@Override
public String getColumnTypeName(int column) throws SQLException;
/* Get column type class name for the column index. */
@Override
public String getColumnClassName(int column) throws SQLException;
} |
Unsupported Features
- Don't support transaction such as commit, rollback
- Don't support prepare statement, prepare call and etc operations
- Don't support management operations such as savepoint and etc
Exception Handling
When an error occurs, Flink Jdbc Driver mainly throws the following exceptions
SQLState Class | SQLState SubClass | Reason | Exception | Operations |
---|---|---|---|---|
22 | 000 to 02H according to different errors | Description of data conversion error | SQLDataException | Get data error from ResultSet in methods getXXX |
0A | 000 | Specific feature is not supported | SQLFeatureNotSupportedException | All unimplemented methods will throw this exception |
58 | 004 | The exception or error message from Gateway | SQLNonTransientException | Gateway throws an exception or returns an error message when executing the query |
08 | 006 | The session is not exist in Gateway and client need to create new connection to it | SQLNonTransientConnectionException | Gateway is restarted and the client need to create new connection |
We can continue to subdivide and throw different exceptions according to the error information returned by the Gateway in Flink Jdbc Driver in the future
...
[1] https://github.com/ververica/flink-sql-gateway
...