Discussion thread | -https://lists.apache.org/thread/d1owrg8zh77v0xygcpb93fxt0jpjdkb3 | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
Vote thread | https://lists.apache.org/thread/7jbmg22lnww31sbfdzztwrzgm6bkhjrj | ||||||||||
JIRA |
| JIRA
| |||||||||
Release | -1.18.0 |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
draw.io Board Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
There are 6 7 main classes in Flink Jdbc Driver: FlinkDriver, FlinkDataSource, FlinkConnection, FlinkStatement, FlinkResultSet, FlinkDatabaseMetaData and FlinkResultSetMetaData which implement jdbc interface Driver, DataSource, Connection, Statement, ResultSet, DatabaseMetaData and ResultSetMetaData.
- FlinkDriver parses gateway address from url, and creates FlinkConnection
- FlinkDataSource manages connection pool for flink jdbc, it will create specific count of connections and choose one for client directly.
- FlinkConnection FlinkConnection creates Executor according to gateway address. When the Connections is closed, it can close the connection with gateway by Executor
- FlinkStatement can get Executor from FlinkConnection, and submit sql query to it. After query is executed, FlinkStatement can get StatementResult from Executor, and create FlinkResultSet
- FlinkResultSet is an iterator, it gets results from StatementResult and return them to users
- FlinkDatabaseMetaData provides meta data of catalogs, databases and tables
- FlinkResultSetMetaData provides meta data of ResultSet such as columns
...
draw.io Board Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
There're Session
s and Operation
s in SqlGateway
. SqlGateway
will open a Session
for each FlinkConnection
, and then do multiple Operation
s in one Session
. When users create a FlinkConnection
by FlinkDriver
with SqlGateway
, it will open an exist or a new Session
. Any time users want to issue SQL statements to the database, they require a FlinkStatement
instance from FlinkConnection
. Once users have a FlinkStatement
, they can use issue a query. This will return a FlinkResultSet
instance, which contains the entire result. Each operation such as the execution query(Flink job), fetching results in FlinkResultSet
will be an Operation
in the Session
of SqlGateway
.
...
Currently TIMESTAMP_WITH_LOCAL_TIMEZONE is not exist in java.sql.Types, but it is supported by Flink
. Users can define a field as (f TIMESTAMP(p) WITH LOCAL TIME ZONE)
and set time zone through the connection parameters or dynamic parameters in console by table.local-time-zone
. After that, users can get Timestamp
which will be automatically converted from stored time data into specific Timestamp
according to the given time zone.
...
Java Sql Interfaces
There are many methods in Jdbc Driver, while this FLIP only implement the basic methods first and more methods will be implemented later when they are needed.
...
Code Block | ||
---|---|---|
| ||
/* Jdbc Driver for flink sql gateway. */ public class FlinkDriver implements Driver { /* Connect sql gateway with given Only Batch Mode queries are supported. If you force to submit streaming queries, you may get unrecognized updates, deletions and other results in FlinkResultSet. */ public class FlinkDriver implements Driver { /* Connect sql gateway with given url and open/create session with given priperties. */ @Override public Connection connect(String url, Properties info) throws SQLException; } |
- Methods in
FlinkConnection
FlinkDataSource
Code Block | ||
---|---|---|
| ||
/* ConnectionJdbc toDataSource flinkmanages sql gatewayconnections for jdbc driverclient, we can support more operations in it in the future. */ public class FlinkConnectionFlinkDataSource implements ConnectionDataSource { /* The max /* Create statement from connectioncount of connections which the data source holds. */ @Override public Statement createStatement() throws SQLException; /* Close session in sql gatewayprivate int maxActive; /* Set the url of connection. */ public synchronized void setUrl(String url); /* Set the driver class name for the source. */ @Override publicpublic synchronized void closesetDriverClassName(String driverClassName) throws; SQLException; /* UseSet giventhe catalogmax toactive theconnection sessionfor inthe sql gatewaysource. */ @Override public synchronized void setCatalogsetMaxActive(Stringint catalogmaxActive) throws SQLException; /* Get currenta catalogconnection namefrom fromdata sessionsource. */ @Override public StringConnection getCataloggetConnection() throws SQLException; @Override public Connection getConnection(String username, String password) throws SQLException; } |
- Methods in
FlinkConnection
Code Block | ||
---|---|---|
| ||
/* Connection to flink sql gateway for jdbc driver. */ public class FlinkConnection implements Connection { /* Create statement from connection. */ @Override public Statement createStatement() throws SQLException; /* Close session in sql gateway. */ @Override public void close() throws SQLException; /* Use given catalog to the session in sql gateway. */ @Override public void setCatalog(String catalog) throws SQLException; /* Get current catalog name from session. */ @Override public String getCatalog() throws SQLException; /* Get FlinkDatabaseMetaData instance for the current catalog. */ @Override public DatabaseMetaData getMetaData() throws SQLException; /* Use given database to the session in sql gateway. */ @Override public void setSchema(String schema) throws SQLException; /* Get current database name from session. */ @Override public String getSchema() throws SQLException; } |
...
- Methods in
FlinkResultSet
:FlinkResultSet
only supports fetching data from iteratorStatementResult
, it supports getXXX methods and doesn't support deleting, updating or moving the cursor. Compare withResultSet
, there isgetKind
method inFlinkResultSet
to get theRowKind
of current record.
Code Block | ||
---|---|---|
| ||
/* ResultSet for flink jdbc driver. */ public class FlinkResultSet implements ResultSet { /* Get the row kind for the current record ResultSet for flink jdbc driver. Only Batch Mode queries are supported. If you force to submit streaming queries, you may get unrecognized updates, deletions and other results. */ public RowKind getKind(); class FlinkResultSet implements ResultSet { /* Return true if there are more resuts in result iterator. */ @Override public boolean next() throws SQLException; /* Close the fetch result operation. */ @Override public void close() throws SQLException; /* Get different values according to data type and column index. */ @Override public <V> V getXXX(int columnIndex) throws SQLException. /* Get different values according to data type and column name. */ @Override public <V> V getXXX(String columnName) throws SQLException. } |
...