Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

draw.io Board Diagram
bordertrue
diagramNamedriver-gateway
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth281
revision1

There are four 6 main classes in Flink Jdbc Driver: FlinkDriver, FlinkConnection, FlinkStatement, FlinkResultSet, FlinkDatabaseMetaData and FlinkResultSet FlinkResultSetMetaData which implement jdbc interface Driver, Connection, Statement, ResultSet, DatabaseMetaData and ResultResultSetMetaData.

  1. FlinkDriver parses gateway address from url, and creates FlinkConnection
  2. FlinkConnection creates Executor according to gateway address. When the Connections is closed, it can close the connection with gateway by Executor
  3. FlinkStatement can get Executor from FlinkConnection, and submit sql query to it. After query is executed, FlinkStatement can get StatementResult from Executor, and create FlinkResultSet
  4. FlinkResultSet is an iterator, it gets results from StatementResult and return them to users
  5. FlinkDatabaseMetaData provides meta data of catalogs, databases and tables
  6. FlinkResultSetMetaData provides meta data of ResultSet such as columns

The call relationship between them are as followed.

...

User Case

Flink Jdbc Driver project module will be packaged into an independent jar file such as flink-table-jdbc-driver-{version}.jar , which will contains classes of flink jdbc driver and shaded flink classes such as data type. User only need to add dependecy jdbc dependency in pom.xml or add the jar in the classpath of external jdbc tools Jdbc Tools such as sqlite.

Code Block
languagejava
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-jdbc-driver</artifactId>
    <version>${flink.version}</version>
</dependency>

...

  1. Driver Name: org.apache.flink.table.jdbc.FlinkDriver
  2. Flink Connection URL: Users can use default catalog and database directly, or set them in url. They Users can also set custom parameters in url to open the session in SqlGateway with key1=val1&key2=val2&... 
    1. jdbc:flink://{sql-gateway.host}:{sql-gateway.port}?key1=val1&key2=val2
    2. jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/{catalog name}?key1=val1&key2=val2
    3. jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/{catalog name}/{database name}?key1=val1&key2=val2
  3. Currently SqlGateway does not support authentication, user  and password in connection are invalid.
  4. Use Flink Jdbc Driver in Java code

    Code Block
    languagejava
    String driverName = "org.apache.flink.table.jdbc.FlinkDriver";
    String url = "jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/catalog/database?key1=val1&key2=val2";
    Class.forName(driverName);
    
    try (Connection connection = DriverManager.getConnection(url)) {
        try (Statement statement = connection.createStatement()) {
            try (ResultSet resultSet = statement.execute("SELECT * FROM {Your Table}")) {
                while (resultSet.hasNext()) {
                    ....
                }
            }
        }
    }


  5. User can also add the flink-table-jdbc-driver-{version}.jar to the classpath of external jdbc tools.

...

We support to convert the following flink data type to sql data type in this FLIP, and more types can be supported as needed in the future.

Flink Data Type

Jdbc Sql Data Type

CharType/VarCharType

String

BooleanType

Boolean

TinyIntType

Byte

SmallIntType

Short

IntType

Int

BigIntType

Long

FloatType

Float

DoubleType

Double

DecimalType

BigDecimal

BinaryType/VarBinaryType

(byte[]) Bytes

DateType

Date

TimeType

Time

TimestampType

Timestamp

ArrayType

Array

Public Interface

There are many functions methods in Jdbc Driver, while this FLIP only implement the basic functions methods first and more functions methods will be supported later. The mainly related interfaces are as follows.implemented later when they are needed. 

  • Methods in FlinkDriver 
Code Block
languagejava
/* Jdbc Driver for flink sql gateway */
public class FlinkDriver implements Driver {
    /* Connect sql gateway with given url and open/create session with given priperties. */
	@Override
    public Connection connect(String url, Properties info) throws SQLException;
}

...

  • Methods in FlinkResultSet : FlinkResultSet only supports to fetching data from iterator StatementResult , it supports getXXX methods and doesn't support deleting, updating or moving the cursor.

...

  • Methods in FlinkResultSetMetaData : FlinkResultSetMetaData only supports to getting column information according to column index or name.
Code Block
languagejava
/* ResultSetMetaData in flink sql driver. */
public class FlinkResultSetMetaData implements ResultSetMetaData {
    /* Get column count in the result set. */
 	@Override 
    public int getColumnCount() throws SQLException;
    
    /* If the column may be null. */
 	@Override 
    public int isNullable(int column) throws SQLException;
    
    /* Get display size for the column. */
 	@Override 
    public int getColumnDisplaySize(int column) throws SQLException;
    
    /* Get column name according to column index. */
 	@Override 
    public String getColumnLabel(int column) throws SQLException;
    public String getColumnName(int column) throws SQLException;
    
    /* Get precision for the column index. */
 	@Override 
    public int getPrecision(int column) throws SQLException;
    
    /* Get column type id for the column index. */
 	@Override 
    public int getColumnType(int column) throws SQLException;
    
    /* Get column type name for the column index. */
 	@Override 
    public String getColumnTypeName(int column) throws SQLException;
    
    /* Get column type class name for the column index. */
 	@Override 
    public String getColumnClassName(int column) throws SQLException;
}

Unsupported

...

Features

  1. Don't support transaction such as commit, rollback
  2. Don't support prepare statement, prepare call and etc operations
  3. Don't support management operations such as savepoint and etc

...