Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: New FLIP 202 Design

Status

Page properties


Discussion threadhttps://lists.apache.org/thread/55j123c4m8b6obs3msx5dxzdp4l1sgkp
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-26999

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Flink can directly write or read ClickHouse through flink connector JDBC, but it is not flexible and easy to use, especially in the scenario of writing data to ClickHouse by FlinkSQL. The ClickHouse-JDBC project  group implemented a BalancedClickhouseDataSource component that adapts to the ClickHouse cluster, and has also been tested by a good production environment, which can  solve well some problems of insufficient flexibility of flex connector JDBC. 

Therefore, we plan to introduce the launch ClickHouse DataStream Connector & Flink ClickHouse SQL Connector & Catalog , and do the function adaptation of BalancedClickhouseDataSource, so as adopt the latest Clickhouse driver to make the integration of Flink and ClickHouse more convenient and easy to use.

⚠️ Part of the code will refer to an unofficial flink-clickhouse-connector [3] implementation that is open source following the ASF 2.0 license protocol [4]

Proposed Changes

  1. Introduce ClickHouse DataStream Connector.
  2. Introduce ClickHouse SQL Connector.
  3. Introduce ClickHouse Catalog.

Things to confirm

About ClickHouseDynamicTableSource

  1. Introduce ClickHouse Async Sink (FLIP-171)[9]
  2. Introduce ClickHouse LookUp (low priority, Or implement it again after producing the first stable version)

Things to confirm

About ClickHouseDynamicTableSource

It should  implement :

  1. ScanTableSource:
  2. LookupTableSource: 
  3. SupportsLimitPushDown: To avoid scenarios with large amounts of data

About ClickHouseDynamicTableSink

It should  implement :

...

The following scenarios also need to be considered:

  1. Support writing distributed and local tables
    1. write into a distributed table
      1. We don't need to care about the load balancing of ClickHouse table.
      2. In the case of asynchronous writing to distributed tables, data may be lost. When using the synchronous write mechanism, there may be a certain delay.
      3. And Writing data into a  distributed table will cause the pressure to the network & io loading of ClickHouse cluster. This limitation has nothing to do with how to implement the connector, which is the limitation of ClickHouse itself. 
    2. write into local tables
      1. The write frequency is controlled according to the data batch size and batch interval to achieve a balance between part merge pressure and data real-time for ClickHouse.
      2. The BalancedClickhouseDataSource can  ensure theoretically the load balance of each ClickHouse instance through random routing, but it is only detect the instance through periodic Ping, shields the currently inaccessible instances, and has no failover. Therefore, once we try to write to the failed node, we will lose data. We can introduce a retry mechanism that can configure parameters to minimize this situation.
      3. Enhance the route strategies to ClickHouse instance. e.g. round robin, shard key hash.
    ClickHouse does not support the transaction feature, so we do not need to consider the consistency of data writing
      1. Enhance the route strategies to ClickHouse instance. e.g. round robin, shard key hash.
  2. ClickHouse does not support the transaction feature, so we do not need to consider the consistency of data writing


The BalancedClickhouseDataSource [5] described in the previous version of FLIP-202 has been integrated into the URL configuration when use client version 0.4.0 + . Users can fill in the correct URL format to ensure the load balancing of each ClickHouse instance through random routing.

Related Classes

ClickHouseDynamicTableSource

Code Block
languagejava
themeEclipse
@Internal
public class ClickHouseDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsLimitPushDown {

    private final Scheme tableSchema;

    private final ClickHouseOptions options; 
    
    @Override
    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
        return null;
    }

    @Override
    public ChangelogMode getChangelogMode() {
        return null;
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext){
    }

    @Override
    public DynamicTableSource copy() {

    }

    @Override
    public String asSummaryString() {
        return null;
    }

    @Override
    public void applyLimit(long limit) {
        ...
    }
}


ClickHouseDynamicTableSink

Code Block
languagejava
@Internal
public class ClickHouseDynamicTableSink implements DynamicTableSink {

    private final Scheme tableSchema;

    private final ClickHouseOptions options;

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        return null;
    }

    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        return OutputFormatProvider.of(...);
    }

    @Override
    public DynamicTableSink copy() {
        return new ClickHouseDynamicTableSink(...);
    }

    @Override
    public String asSummaryString() {
        return "";
    }
}



ClickHouseOptions

Code Block
languagejava
    private  String url;// server url information.
    
    private final String username;

    private final String password;

    private final String databaseName;

    private final String tableName;

    private final int batchSize; 

    private final Duration flushInterval;

    private final int maxRetries;

    private final boolean writeLocal; // write into local tables or distributed tables.

    private final String partitionStrategy; // for data routing.

    private final String partitionKey; 

    private final boolean ignoreDelete;

    private ClickHouseDialect dialect;


ClickHouseCatalog


Code Block
languagejava
@Internal
public class ClickHouseCatalog extends AbstractCatalog {

    public ClickHouseCatalog(
            String catalogName,
            @Nullable String defaultDatabase,
            String baseUrl,
            String username,
            String password,
            Map<String, String> properties) {
        super(...);
        //...
    }

    @Override
    public void open() throws CatalogException {
    }

    @Override
    public synchronized void close() throws CatalogException {
    }

    @Override
    public Optional<Factory> getFactory() {

        return Optional.of(...);
    }

    // ------------- databases -------------

    @Override
    public synchronized List<String> listDatabases() throws CatalogException {}

    @Override
    public CatalogDatabase getDatabase(String databaseName)
            throws DatabaseNotExistException, CatalogException { }

    @Override
    public boolean databaseExists(String databaseName) throws CatalogException { }

    // ------------- tables -------------

    @Override
    public synchronized List<String> listTables(String databaseName)
            throws DatabaseNotExistException, CatalogException { }

    @Override
    public CatalogBaseTable getTable(ObjectPath tablePath)
            throws TableNotExistException, CatalogException { }

    @Override
    public boolean tableExists(ObjectPath tablePath) throws CatalogException {}

	// -------other unsupport methods placeholders.
	...
}

Using the ClickHouseCatalog

Table API

Code Block
// java
tEnv.registerCatalog(name, new ClickHouseCatalog(...))

SQL

Code Block
languagesql
DROP ck_catalog IF EXISTS ck_catalog;
CREATE CATALOG ck_catalog WITH(
    'type' = '',
    'default-database' = '...',
    'username' = '...',
    'password' = '...',
    'url' = '...'
    [, '' = '']
);

USE CATALOG ck_catalog;


SQL CLI yaml configs

Code Block
catalogs:
	- name: ...
	  type: clickhouse
      username:...,

	  password:...,
      default_database:...,
      ...

Using the ClickHouse SQL connector


Code Block
languagesql
DROP TABLE if exists t1;
CREATE TABLE t1 (
    `pid` BIGINT,
    `bigint_col` BIGINT,
    `int_col` INTEGER,
    `str_col` STRING,
    `d_col` DOUBLE,
    `list_col` ARRAY<STRING>,
    `map_col` Map<STRING, BIGINT>,
    PRIMARY KEY (`pid`) NOT ENFORCED
) WITH (
    'connector' = 'clickhouse',  -- ident
    'url' = 'clickhouse://xxxxx:port[, ...]',  list or single host information.
    'username' = '',  
    'password' = '',  
    'database-name' = 'ckd1', 
    'table-name' = 'ckt1',  

    -----source options-----
    'lookup.cache.max-rows' = '100', -- cache capacity threshold, the cache for lazy loading. 
    'lookup.cache.ttl' = '10', -- time to live setting for the elements located in the cache.
    'lookup.max-retries' = '3', -- retry setting.
    -- ...

     -----sink options------
    'sink.batch-size' = '1000',  -- batch size threshold
    'sink.flush-interval' = '1 min',  -- time interval with unit.
    'sink.max-retries' = '3',  
    'sink.partition-strategy' = 'balanced', --data routing strategy
    'sink.write-local' = 'false', -- write into a local table or a distributed table.
    'sink.ignore-delete' = 'true', 
     -- ....
);

Versions

We will provide public interface in the section about frequent version changes of Clickhouse to facilitate users to overwrite.

...

Flink Catalog Metaspace Structure

...

ClickHouse Metaspace Structure 

...

catalog name (defined in Flink only)

...

n/a

...

database name

...

database name

...

table name

...

table name

WITH Options

Option

Description

Type

require

Default

Remark

url

The ClickHouse jdbc url in format clickhouse://<host>:<port>.

String

Y

-

The URL format is jdbc:clickhouse://<host>:<port>/<yourDatabaseName>

If you do not write the database name, the default database will be used.

userName

The 'username' and 'password' must both be specified if any of them is specified.

String

Y

-


password

The ClickHouse password.

String

Y

-


tableName

The ClickHouse table name

String

Y

-


sink.maxRetrys

The max retry times when writing records to the database failed.

Int

N

3


sink.batchSize

The max flush size, over this will flush data.

Int

N

100

If the number of data items in the cache reaches the value of the sink.batchSize parameter, or the waiting time exceeds sink.flushInterval, the system will automatically write the data in the cache to the ClickHouse table.

sink.flushInterval

Over this flush interval mills, asynchronous threads will flush data.

Optional

N

1s


sink.ignoreDelete

Whether to ignore delete statements.

Boolean

N

true

The parameter values are as follows:

  • true (default): ignored delete msg.
  • false: Do not ignore. If it is false, and the Primary Key is declared in the DDL, the ALTER statement of ClickHouse will be used to delete the data.

sink.parallelism

Defines a custom parallelism for the sink.

IntegerN-Defines a custom parallelism for the sink.

sink.shard-write-local


Boolean

N

false


  • true:Skip the distributed table and directly write the data to the local table corresponding to the ClickHouse distributed table.Flink will automatically query the cluster information of ClickHouse, get the corresponding local table information, and write it. At this point, tableName should be the name of the distributed table. You can also manually specify which nodes' local tables to write data to in the URL. At this point tableName should be the name of the local table. Examples are as follows.
    'url' = 'jdbc:clickhouse://node1:8123,node2:8123/default'
    'tableName' = 'local_table_name'
  • false(defalut):Write the ClickHouse distributed table first, and then write the distributed table to the corresponding local table. In this case, tableName should be the name of the distributed table.

sink.partition-strategy


Enum

N

default

The parameter values are as follows:

  • default (default value): Indicates that it is always written to the local table of the first node.
  • partition: Indicates that data is written to the local table of the same node by key.
  • random: Indicates that the local table of a certain node is randomly written.

sink.partition-key


default

N

When the value of "sink.partition-strategy" is "partition", the value of "sink.partition-key" is required, and can contain multiple fields, and multiple fields are separated by commas (,).

properties.*

This can set and pass clickhouse-jdbc configurations.

none

N


Allows the user to configure some custom parameters for the connection

lookup.cache

The caching strategy for this lookup table, including NONE and PARTIAL

String

N

NONE


lookup.partial-cache.expire-after-access

Duration to expire an entry in the cache after accessing, over this time, the oldest rows will be expired.

Duration

N

NONE


lookup.partial-cache.expire-after-write

Duration to expire an entry in the cache after writing, over this time, the oldest rows will be expired.

Duration

N

NONE


lookup.partial-cache.max-rows

The max number of rows of lookup cache, over this value, the oldest rows will be expired.

Long

N

NONE


lookup.partial-cache.caching-missing-key

Flag to cache missing key, true by default

Boolean

N

true


lookup.max-retries

The max retry times if lookup database failed.

Integer

N

3



Versions

We will provide public interface in the section about frequent version changes of Clickhouse to facilitate users to overwrite.

Flink Catalog Metaspace Structure

ClickHouse Metaspace Structure 

catalog name (defined in Flink only)

n/a

database name

database name

table name

table name

Flink TypeClickHouse Type
CHARString
VARCHARString / UUID / IP
STRINGString / Enum
BOOLEANUInt8 / Boolean
BYTESFixedString
TINYINTInt8
SMALLINTInt16 / UInt8
INTEGERInt32 / UInt16 / Interval
BIGINTInt64 / UInt32
FLOATFloat32
DOUBLEFloat64
DECIMALDecimal / Int128 / Int256 / UInt64 / UInt128 / UInt256
DATEDate
TIMEDateTime
TIMESTAMPDateTime
TIMESTAMP_LTZDateTime
INTERVAL_YEAR_MONTHInt32
INTERVAL_DAY_TIMEInt64
ARRAYArray
MAPMap
ROW-
MULTISET-
RAW-

Compatibility, Deprecation, and Migration Plan

  • Introduce ClickHouse  connector for users
  • It will be a new feature, so we needn't phase out the older behavior.
  • we don't need special migration tools

Test Plan

We could add unit test cases and integration test cases based on testcontainers.

Rejected Alternatives

flink-clickhouse-connector [3] join in FLIP-202

  • Relying on the unofficial JDBC package implementation, the maintenance has been stopped at present, and there are certain risks
    • Since this project used to be a personal project, the code before 0.3.2 [6] has potential license issues. The Clickhouse community has subsequently rename related packages and fixed licenses
    • From 0.4.0 + version is a breaking version update, the JDBC driver name has been changed to: com.clickhouse.jdbc.ClickHouseDriver  [7]
    • It is not recommended to use the API version lower than 0.4.0 [8], because there are some potential performance problems
    • To support FLIP-171 [9] async sink, JDBC cannot be used, It only HTTP or TCP can be used async [10] . 
  • Unable to keep in sync with the flink version, the last adaptation was in flink 1.16.0

**** I am not an expert on clickhouse, just used it before. If there are any omissions or mistakes, please feed back to dev-email ****

Related References

  1. https://clickhouse.com/docs/en/engines/table-engines/
  2. https://clickhouse.com/docs/en/sql-reference/data-types/
  3. https://github.com/itinycheng/flink-connector-clickhouse
  4. https://github.com/ClickHouse/clickhouse-java/pull/1089
  5. https://github.com/ClickHouse/clickhouse-java/issues/949
  6. https://github.com/ClickHouse/clickhouse-java/issues/768
  7. https://github.com/dbeaver/dbeaver/issues/15118
  8. https://github.com/ClickHouse/clickhouse-java/releases/tag/v0.4.0
  9. https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink

...

Compatibility, Deprecation, and Migration Plan

  • Introduce ClickHouse  connector for users
  • It will be a new feature, so we needn't phase out the older behavior.
  • we don't need special migration tools

Test Plan

We could add unit test cases and integration test cases based on testcontainers.

Rejected Alternatives

Related References

...

  1. https://clickhouse.com/

...

  1. blog/

...

  1. asynchronous-

...

  1. data-

...

  1. inserts-in-clickhouse