Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


JIRA
Discussion thread-https://lists.apache.org/thread/d1owrg8zh77v0xygcpb93fxt0jpjdkb3
Vote threadhttps://lists.apache.org/thread/7jbmg22lnww31sbfdzztwrzgm6bkhjrj
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b

keyFLINK-31496

Release-1.18.0


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In our production, we create Flink Session Cluster and users can perform OLAP query in it. Users like to use Jdbc Driver in their applications to submit queries and get results. Currently we use the old The old repositories flink-sql-gateway[1] and flink-jdbc-driver[2] ,  but when we try to upgrade our Flink Session Cluster, We meet compatibility problem. support submitting query to the older version of flink cluster. But for the latest flink cluster there are many compatibility problems. Flink Sql Gateway has been a submodule in Flink, and the API has been greatly optimized. The old Flink JDBC Driver cannot connect to the new Gateway directly which will cause we users cannot upgrade our their Flink versionClusters. In this FLIP, Iwe'd like to introduce Flink Jdbc Driver module in Flink to connect the new Gateway, users can use Jdbc Driver to submit their queries and get results like a database in their applications.

...

draw.io Board Diagram
bordertrue
diagramNamedriver-gateway
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth281
revision1

There are four 7 main classes in Flink Jdbc Driver: FlinkDriver, FlinkDataSource, FlinkConnection, FlinkStatement, FlinkResultSet, FlinkDatabaseMetaData and FlinkResultSet FlinkResultSetMetaData which implement jdbc interface Driver, DataSource, Connection, Statement, ResultSet, DatabaseMetaData and ResultResultSetMetaData.

  1. FlinkDriver parses gateway address from url, and creates FlinkConnection
  2. FlinkDataSource manages connection pool for flink jdbc, it will create specific count of connections and choose one for client directly.
  3. FlinkConnection creates Executor according to gateway address. When the Connections is closed, it FlinkConnection creates Executor according to gateway address. When the Connections is closed, it can close the connection with gateway by Executor
  4. FlinkStatement can get Executor from FlinkConnection, and submit sql query to it. After query is executed, FlinkStatement can get StatementResult from Executor, and create FlinkResultSet
  5. FlinkResultSet is an iterator, it gets results from StatementResult and return them to users
  6. FlinkDatabaseMetaData provides meta data of catalogs, databases and tables
  7. FlinkResultSetMetaData provides meta data of ResultSet such as columns

The call relationship between them are as followed.

draw.io Board Diagram
bordertrue
diagramNameclasses-driver
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth701
revision2

User Case

Flink Jdbc Driver project will be packaged into an independent jar file such as flink-table-jdbc-driver-{version}.jar , which will contains classes of flink jdbc driver and shaded flink classes such as data type. User only need to add dependecy in pom.xml or add the jar in the classpath of external jdbc tools such as sqlite.

Code Block
languagejava
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-jdbc-driver</artifactId>
    <version>${flink.version}</version>
</dependency>

More information about Flink Jdbc Driver

3

There're Sessions and Operations in SqlGateway. SqlGateway will open a Session for each FlinkConnection , and then do multiple Operations in one Session. When users create a FlinkConnection by FlinkDriver with SqlGateway, it will open an exist or a new Session. Any time users want to issue SQL statements to the database, they require a FlinkStatement instance from FlinkConnection. Once users have a FlinkStatement , they can use issue a query. This will return a FlinkResultSet instance, which contains the entire result. Each operation such as the execution query(Flink job), fetching results in FlinkResultSet will be an Operation in the Session of SqlGateway .

User Case

Flink Jdbc Driver module will be packaged into an independent jar file such as flink-table-jdbc-driver-{version}.jar , which will contains classes of flink jdbc driver and shaded flink classes such as data type. User only need to add jdbc dependency in pom.xml or add the jar in the classpath of external Jdbc Tools such as sqlite.

Code Block
languagejava
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-jdbc-driver</artifactId>
    <version>${flink.version}</version>
</dependency>

More information about Flink Jdbc Driver

  1. Driver Name: org.apache.flink.table.jdbc.FlinkDriver
  2. Flink Connection URL: Users can use default catalog and database directly, or set them in url. Users can also set custom parameters in url to open the session in SqlGateway with key1=val1&key2=val2&... 
  3. Driver Name: org.apache.flink.table.jdbc.FlinkDriver
  4. Flink Connection URL: Users can use default catalog and database directly, or set them in url. They can set custom parameters in url to open the session in SqlGateway with key1=val1&key2=val2&... jdbc:flink:/
    1. jdbc:flink://{sql-gateway.host}:{sql-gateway.port}?key1=val1&key2=val2
    2. jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/{catalog name}?key1=val1&key2=val2
    3. jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/{catalog name}/{database name}?key1=val1&key2=val2

    Use Flink Jdbc Driver in Java code

    Code Block
    languagejava
    String driverName = "org.apache.flink.table.jdbc.FlinkDriver"; String url = "
    1. jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/{catalog
    /database
    1. name}?key1=val1&key2=val2
    2. jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/{catalog name}/{database name}?key1=val1&key2=val2
  5. Currently SqlGateway does not support authentication, user  and password in connection are invalid.
  6. Use Flink Jdbc Driver in Java code

    Code Block
    languagejava
    String driverName = "org.apache.flink.table.jdbc.FlinkDriver";
    String url = "jdbc:flink://{sql-gateway.host}:{sql-gateway.port}/{catalog name}/{database name}?key1=val1&key2=val2";
    Class.forName(driverName);
    
    try (Connection connection = DriverManager.getConnection(url)) {
        try (Statement statement = connection.createStatement()) {
            try (ResultSet resultSet = statement.execute("";
    Class.forName(driverName);
    
    try (Connection connection = DriverManager.getConnection(url)) {
        try (Statement statement = connection.createStatement()) {
            try (ResultSet resultSet = statement.execute("SELECT * FROM {Your Table}")) {
                while (resultSet.hasNext()) {
                    Do your work ....
                }
            }
    
    		try (ResultSet resultSet  }
    }
  7. User can also add the flink-table-jdbc-driver-{version}.jar to the classpath of external jdbc tools.

Data Types

We support to convert the following flink data type to sql data type in this FLIP, and more types can be supported as needed in the future.

...

Flink Data Type

...

Jdbc Sql Data Type

...

CharType/VarCharType

...

String

...

BooleanType

...

Boolean

...

TinyIntType

...

Byte

...

SmallIntType

...

Short

...

IntType

...

Int

...

BigIntType

...

Long

...

FloatType

...

Float

...

DoubleType

...

Double

...

DecimalType

...

BigDecimal

...

BinaryType/VarBinaryType

...

(byte[]) Bytes

...

DateType

...

Date

...

TimeType

...

Time

...

TimestampType

...

Timestamp

...

ArrayType

...

Array

  1. = statement.execute("SELECT * FROM T1 JOIN T2 on T1.id=T2.id ...")) {
    			while (resultSet.hasNext()) {
                    Do your work ...
                }
    		}
        }
    }


  2. User can also add the flink-table-jdbc-driver-{version}.jar to the classpath of external jdbc tools.

Data Types

The following basic flink data types are supported to convert to sql data type in this FLIP, and more types can be supported as needed in the future.

Flink Data Type

Java Sql Data Type

Java Data Type

CharType/VarCharType

CHAR/VARCHAR

String

BooleanType

BOOLEAN

Boolean

TinyIntType

TINYINT

Byte

SmallIntType

SMALLINT

Short

IntType

INTEGER

Int

BigIntType

BIGINT

Long

FloatType

FLOAT

Float

DoubleType

DOUBLE

Double

DecimalType

DECIMAL

BigDecimal

BinaryType/VarBinaryType

BINARY/VARBINARY

byte[]

DateType

DATE

Date

TimeType

TIME

Time

TimestampType

TIMESTAMP

Timestamp

ZonedTimestampType

TIMESTAMP_WITH_TIMEZONE

OffsetDateTime

LocalZonedTimestampType

TIMESTAMP_WITH_LOCAL_TIMEZONE

Timestamp

ArrayType

ARRAY

Array

RowType

ROW(Not in java.sql.Types)

Row(Flink Row Data)

MapType

MAP

Map<K, V>

Currently TIMESTAMP_WITH_LOCAL_TIMEZONE is not exist in java.sql.Types, but it is supported by Flink. Users can define a field as (f TIMESTAMP(p) WITH LOCAL TIME ZONE) and set time zone through the connection parameters or dynamic parameters in console by table.local-time-zone. After that, users can get Timestamp which will be automatically converted from stored time data into specific Timestamp  according to the given time zone.

Java Sql Interfaces

There are many methods in Jdbc Driver, while this FLIP only implement the basic methods first and more methods will be implemented later when they are needed. 

  • Methods in FlinkDriver 
Code Block
languagejava
/* Jdbc Driver for flink sql gateway. Only Batch Mode queries are supported. If you force to submit streaming queries, you may get unrecognized updates, deletions and other results in FlinkResultSet. */
public class FlinkDriver implements Driver {
    /* Connect sql gateway with given url and open/create session with given priperties. */
	@Override
    public Connection connect(String url, Properties info) throws SQLException;
}
  • Methods in FlinkDataSource
Code Block
languagejava
/* Jdbc DataSource manages connections for client, we can support more operations in it in the future. */
public class FlinkDataSource implements DataSource {
	/* The max count of connections which the data source holds. */
	private int maxActive;

	/* Set the url of connection. */
	public synchronized void setUrl(String url);

	/* Set the driver class name for the source. */
	public synchronized void setDriverClassName(String driverClassName); 

	/* Set the max active connection for the source. */
    public synchronized void setMaxActive(int maxActive);

Public Interface

There are many functions in Jdbc Driver, while this FLIP only implement the basic functions first and more functions will be supported later. The mainly related interfaces are as follows.

  • Methods in FlinkDriver 
Code Block
languagejava
/* Jdbc Driver for flink sql gateway */
public class FlinkDriver implements Driver {
    /* Connect sql gateway with given url and open/create session with given priperties. */
	@Override
    public Connection connect(String url, Properties info) throws SQLException;
}
  • Methods in FlinkConnection 
Code Block
languagejava
/* Connection to flink sql gateway for jdbc driver. */
public class FlinkConnection implements Connection {
    /* Create statement from connection. */
	@Override
    public Statement createStatement() throws SQLException;
    
    /* Close session in sql gateway. */
 	@Override
    public void close() throws SQLException;
    
    /* Use given catalog to the session in sql gateway. */
 	@Override 
    public void setCatalog(String catalog) throws SQLException;
    
    /* Get current catalog name from session. */
 	@Override 
    public String getCatalog() throws SQLException;
    
    /* Get FlinkDatabaseMetaData instance for the current catalog. */
 	@Override 
    public DatabaseMetaData getMetaData() throws SQLException;
    
    /* Use given database to the session in sql gateway. */
 	@Override 
    public void setSchema(String schema) throws SQLException;
    
    /* Get currenta databaseconnection name from sessiondata source. */
	@Override
    public Connection getConnection() throws SQLException;
 	@Override 
    public Connection getConnection(String username, String getSchema(password) throws SQLException;
}
  • Methods in FlinkStatement FlinkConnection 
Code Block
languagejava
/* StatementConnection into flink sql gateway for jdbc driver. */
public class FlinkStatementFlinkConnection implements StatementConnection {
    /* SubmitCreate sqlstatement tofrom sql gateway and get result set. *connection. */
 	@Override  
    public ResultSetStatement executeQuerycreateStatement(String sql) throws SQLException;
    
    /* ExecuteClose givensession updatein sql and return result countgateway. */
 	@Override  
    public intvoid executeUpdateclose(String sql) throws SQLException;
    
    /* Cancel Use given catalog to the runningsession job in sql gateway. */
 	@Override  
    public void closesetCatalog()String catalog) throws SQLException;
    
    /* ReturnGet truecurrent ifcatalog thename result set has more resultsfrom session. */
 	@Override  
    public booleanString getMoreResultsgetCatalog() throws SQLException;
    
    /* Get currentFlinkDatabaseMetaData resultinstance setfor inthe thecurrent statementcatalog. */
 	@Override 
    public ResultSetDatabaseMetaData getResultSetgetMetaData() throws SQLException;
}
  • Methods in FlinkResultSet : FlinkResultSet only supports to fetching data from iterator StatementResult , it supports getXXX methods and doesn't support deleting, updating or moving the cursor.
Code Block
languagejava
/* ResultSet for flink jdbc driver    
    /* Use given database to the session in sql gateway. */
public class	@Override FlinkResultSet
 implements ResultSet {
 public void  /* Return true if there are more resuts in result iterator. */
 	@Override 
    public boolean next(setSchema(String schema) throws SQLException;
    
    /* CloseGet current thedatabase fetchname resultfrom operationsession. */
 	@Override 
    public voidString closegetSchema() throws SQLException;
}
  • Methods in FlinkStatement 
Code Block
languagejava
/* Statement in flink jdbc driver. */
public class FlinkStatement implements Statement {
    /* Get different values accordingSubmit sql to datasql typegateway and get columnresult indexset. */
 	@Override  
    public <V>ResultSet V getXXX(int columnIndexexecuteQuery(String sql) throws SQLException.;
    
    /* GetExecute differentgiven valuesupdate accordingsql toand datareturn type and column nameresult count. */
 	@Override  
    public <V>int V getXXXexecuteUpdate(String columnNamesql) throws SQLException.
}
  • Methods in FlinkDatabaseMetaData : FlinkDatabaseMetaData only supports TABLE and VIEW tables, getting information of catalog, database and tables.
Code Block
languagejava
/* DatabaseMetaData in flink sql driver;
    
    /* Cancel the running job in sql gateway. */
public class	@Override FlinkDatabaseMetaData implements
 DatabaseMetaData {
  public void /* Get the url of flink sql driverclose() throws SQLException;
    
    /* Return true if the result set has more results. */
 	@Override  
    public Stringboolean getURLgetMoreResults() throws SQLException;
    
    /* Get catalogcurrent result nameset listin fromthe sessionstatement. */
 	@Override 
    public ResultSet getCatalogsgetResultSet() throws SQLException;
}
  • Methods in FlinkResultSet : FlinkResultSet only supports fetching data from iterator StatementResult , it supports getXXX methods and doesn't support deleting, updating or moving the cursor. Compare with ResultSet , there is getKind method in FlinkResultSet to get the RowKind of current record.
Code Block
languagejava
/* ResultSet for flink jdbc driver. Only Batch Mode queries are supported. If you force to submit streaming queries, you may get unrecognized updates, deletions and other results. */
public class FlinkResultSet implements ResultSet {
    /* Return true if there are more resuts in result iterator    
    /* Get database name list from session. */
 	@Override 
    public ResultSet getSchemas() throws SQLException;
    
    /* Get database name lins in given catalog from session. */
 	@Override 
    public ResultSetboolean getSchemas(String catalog, String schemaPatternnext() throws SQLException;
    
    /* GetClose tablethe namefetch list with given condition from sessionresult operation. */
 	@Override 
    public ResultSetvoid getTablesclose(String) catalog, String schemaPattern, String tableNamePattern, String[] types) throws SQLExceptionthrows SQLException;
    
    /* Get different values columnaccording listto withdata giventype conditionand fromcolumn sessionindex. */
 	@Override 
    public <V> ResultSetV getColumnsgetXXX(Stringint catalog,columnIndex) String schemaPattern, String tableNamePattern, String columnNamePattern) throws SQLException;
    
throws SQLException.
    
    /* Get different primaryvalues keyaccording listto fordata giventype tableand fromcolumn sessionname. */
 	@Override 
    public <V> ResultSetV getPrimaryKeysgetXXX(String catalog, String schema, String tablecolumnName) throws SQLException;.
}
  • Methods in FlinkResultSetMetaData FlinkDatabaseMetaData : FlinkResultSetMetaData FlinkDatabaseMetaData only supports to TABLE and VIEW tables, getting column information according to column index or nameinformation of catalog, database and tables.
Code Block
languagejava
/* ResultSetMetaDataDatabaseMetaData in flink sql driver. */
public class FlinkResultSetMetaDataFlinkDatabaseMetaData implements ResultSetMetaDataDatabaseMetaData {
    /* Get columnthe counturl inof theflink resultsql setdriver. */
 	@Override 
    public intString getColumnCountgetURL() throws SQLException;
    
    /* IfGet thecatalog columnname maylist befrom nullsession. */
 	@Override 
    public intResultSet isNullablegetCatalogs(int column) throws SQLException;
    
    /* Get displaydatabase sizename forlist thefrom columnsession. */
 	@Override 
    public intResultSet getColumnDisplaySizegetSchemas(int column) throws SQLException;
    
    /* Get columndatabase name lins in accordinggiven tocatalog columnfrom indexsession. */
 	@Override 
    public StringResultSet getColumnLabelgetSchemas(int column) throws SQLException;
    publicString catalog, String getColumnName(int columnschemaPattern) throws SQLException;
    
    /* Get precisiontable forname thelist columnwith index. given condition from session. */
 	@Override 
    public intResultSet getPrecision(int columngetTables(String catalog, String schemaPattern, String tableNamePattern, String[] types) throws SQLException;
    
    /* Get column typelist idwith forgiven thecondition columnfrom indexsession. */
 	@Override 
    public intResultSet getColumnTypegetColumns(intString column)catalog, throws SQLException;
  String schemaPattern, String tableNamePattern, String columnNamePattern) throws SQLException;
    
    /* Get columnprimary typekey namelist for given thetable columnfrom indexsession. */
 	@Override 
    public ResultSet getPrimaryKeys(String catalog, String getColumnTypeName(int column schema, String table) throws SQLException;
}
  • Methods in FlinkResultSetMetaData : FlinkResultSetMetaData only supports getting column information according to column index or name.
Code Block
languagejava
/* ResultSetMetaData in flink sql driver. */
public class FlinkResultSetMetaData implements ResultSetMetaData {
    /    
    /* Get column typecount classin name for the columnresult indexset. */
 	@Override 
    public String getColumnClassName(int column) throws SQLException;
}

Unsupported Ability

...

public int getColumnCount() throws SQLException;
    
    /* If the column may be null. */
 	@Override 
    public int isNullable(int column) throws SQLException;
    
    /* Get display size for the column. */
 	@Override 
    public int getColumnDisplaySize(int column) throws SQLException;
    
    /* Get column name according to column index. */
 	@Override 
    public String getColumnLabel(int column) throws SQLException;
    public String getColumnName(int column) throws SQLException;
    
    /* Get precision for the column index. */
 	@Override 
    public int getPrecision(int column) throws SQLException;
    
    /* Get column type id for the column index. */
 	@Override 
    public int getColumnType(int column) throws SQLException;
    
    /* Get column type name for the column index. */
 	@Override 
    public String getColumnTypeName(int column) throws SQLException;
    
    /* Get column type class name for the column index. */
 	@Override 
    public String getColumnClassName(int column) throws SQLException;
}

Unsupported Features

  1. Don't support transaction such as commit, rollback
  2. Don't support prepare statement, prepare call and etc operations
  3. Don't support management operations such as savepoint and etc

Exception Handling

When an error occurs, Flink Jdbc Driver mainly throws the following exceptions

SQLState ClassSQLState SubClassReasonExceptionOperations
22000 to 02H according to different errorsDescription of data conversion errorSQLDataExceptionGet data error from ResultSet in methods getXXX
0A000Specific feature is not supportedSQLFeatureNotSupportedExceptionAll unimplemented methods will throw this exception
58004The exception or error message from GatewaySQLNonTransientExceptionGateway throws an exception or returns an error message when executing the query
08006The session is not exist in Gateway and client need to create new connection to itSQLNonTransientConnectionExceptionGateway is restarted and the client need to create new connection

We can continue to subdivide and throw different exceptions according to the error information returned by the Gateway in Flink Jdbc Driver in the future

...



[1] https://github.com/ververica/flink-sql-gateway

...