You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Discussion thread
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.3

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Flink can directly write or read ClickHouse through flink connector JDBC, but it is not flexible and easy to use, especially in the scenario of writing data to ClickHouse by FlinkSQL. The ClickHouse-JDBC project  group implemented a BalancedClickhouseDataSource component that adapts to the ClickHouse cluster, and has also been tested by a good production environment, which can  solve well some problems of insufficient flexibility of flex connector JDBC.

Therefore, we plan to introduce the ClickHouse DataStream Connector & Flink ClickHouse SQL Connector & Catalog, and do the function adaptation of BalancedClickhouseDataSource, so as to make the integration of Flink and ClickHouse more convenient and easy to use.

Proposed Changes

  1. Introduce ClickHouse DataStream Connector.
  2. Introduce ClickHouse SQL Connector.
  3. Introduce ClickHouse Catalog.

Things to confirm

About ClickHouseDynamicTableSource

It should  implement :

  1. ScanTableSource:
  2. LookupTableSource: 
  3. SupportsLimitPushDown: To avoid scenarios with large amounts of data

About ClickHouseDynamicTableSink

It should  implement :

  1. DynamicTableSink

The following scenarios also need to be considered:

  1. Support writing distributed and local tables
    1. write into a distributed table
      1. We don't need to care about the load balancing of ClickHouse table.
      2. In the case of asynchronous writing to distributed tables, data may be lost. When using the synchronous write mechanism, there may be a certain delay.
      3. And Writing data into a  distributed table will cause the pressure to the network & io loading of ClickHouse cluster. This limitation has nothing to do with how to implement the connector, which is the limitation of ClickHouse itself. 
    2. write into local tables
      1. The write frequency is controlled according to the data batch size and batch interval to achieve a balance between part merge pressure and data real-time for ClickHouse.
      2. The BalancedClickhouseDataSource can  ensure theoretically the load balance of each ClickHouse instance through random routing, but it is only detect the instance through periodic Ping, shields the currently inaccessible instances, and has no failover. Therefore, once we try to write to the failed node, we will lose data. We can introduce a retry mechanism that can configure parameters to minimize this situation.
      3. Enhance the route strategies to ClickHouse instance. e.g. round robin, shard key hash.
  2. ClickHouse does not support the transaction feature, so we do not need to consider the consistency of data writing

Related Classes

ClickHouseDynamicTableSource

@Internal
public class ClickHouseDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsLimitPushDown {

    private final Scheme tableSchema;

    private final ClickHouseOptions options; 
    
    @Override
    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
        return null;
    }

    @Override
    public ChangelogMode getChangelogMode() {
        return null;
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext){
    }

    @Override
    public DynamicTableSource copy() {

    }

    @Override
    public String asSummaryString() {
        return null;
    }

    @Override
    public void applyLimit(long limit) {
        ...
    }
}


ClickHouseDynamicTableSink

@Internal
public class ClickHouseDynamicTableSink implements DynamicTableSink {

    private final Scheme tableSchema;

    private final ClickHouseOptions options;

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        return null;
    }

    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        return OutputFormatProvider.of(...);
    }

    @Override
    public DynamicTableSink copy() {
        return new ClickHouseDynamicTableSink(...);
    }

    @Override
    public String asSummaryString() {
        return "";
    }
}



ClickHouseOptions

    private  String url;// server url information.
    
    private final String username;

    private final String password;

    private final String databaseName;

    private final String tableName;

    private final int batchSize; 

    private final Duration flushInterval;

    private final int maxRetries;

    private final boolean writeLocal; // write into local tables or distributed tables.

    private final String partitionStrategy; // for data routing.

    private final String partitionKey; 

    private final boolean ignoreDelete;

    private ClickHouseDialect dialect;


ClickHouseCatalog


@Internal
public class ClickHouseCatalog extends AbstractCatalog {

    public ClickHouseCatalog(
            String catalogName,
            @Nullable String defaultDatabase,
            String baseUrl,
            String username,
            String password,
            Map<String, String> properties) {
        super(...);
        //...
    }

    @Override
    public void open() throws CatalogException {
    }

    @Override
    public synchronized void close() throws CatalogException {
    }

    @Override
    public Optional<Factory> getFactory() {

        return Optional.of(...);
    }

    // ------------- databases -------------

    @Override
    public synchronized List<String> listDatabases() throws CatalogException {}

    @Override
    public CatalogDatabase getDatabase(String databaseName)
            throws DatabaseNotExistException, CatalogException { }

    @Override
    public boolean databaseExists(String databaseName) throws CatalogException { }

    // ------------- tables -------------

    @Override
    public synchronized List<String> listTables(String databaseName)
            throws DatabaseNotExistException, CatalogException { }

    @Override
    public CatalogBaseTable getTable(ObjectPath tablePath)
            throws TableNotExistException, CatalogException { }

    @Override
    public boolean tableExists(ObjectPath tablePath) throws CatalogException {}

	// -------other unsupport methods placeholders.
	...
}

Using the ClickHouseCatalog

Table API

// java
tEnv.registerCatalog(name, new ClickHouseCatalog(...))

SQL

DROP ck_catalog IF EXISTS ck_catalog;
CREATE CATALOG ck_catalog WITH(
    'type' = '',
    'default-database' = '...',
    'username' = '...',
    'password' = '...',
    'url' = '...'
    [, '' = '']
);

USE CATALOG ck_catalog;


SQL CLI yaml configs

catalogs:
	- name: ...
	  type: clickhouse
      username:...,

	  password:...,
      default_database:...,
      ...

Using the ClickHouse SQL connector


DROP TABLE if exists t1;
CREATE TABLE t1 (
    `pid` BIGINT,
    `bigint_col` BIGINT,
    `int_col` INTEGER,
    `str_col` STRING,
    `d_col` DOUBLE,
    `list_col` ARRAY<STRING>,
    `map_col` Map<STRING, BIGINT>,
    PRIMARY KEY (`pid`) NOT ENFORCED
) WITH (
    'connector' = 'clickhouse',  -- ident
    'url' = 'clickhouse://xxxxx:port[, ...]',  list or single host information.
    'username' = '',  
    'password' = '',  
    'database-name' = 'ckd1', 
    'table-name' = 'ckt1',  

    -----source options-----
    'lookup.cache.max-rows' = '100', -- cache capacity threshold, the cache for lazy loading. 
    'lookup.cache.ttl' = '10', -- time to live setting for the elements located in the cache.
    'lookup.max-retries' = '3', -- retry setting.
    -- ...

     -----sink options------
    'sink.batch-size' = '1000',  -- batch size threshold
    'sink.flush-interval' = '1 min',  -- time interval with unit.
    'sink.max-retries' = '3',  
    'sink.partition-strategy' = 'balanced', --data routing strategy
    'sink.write-local' = 'false', -- write into a local table or a distributed table.
    'sink.ignore-delete' = 'true', 
     -- ....
);


Versions

We will provide public interface in the section about frequent version changes of Clickhouse to facilitate users to overwrite.

Flink Catalog Metaspace Structure

ClickHouse Metaspace Structure 

catalog name (defined in Flink only)

n/a

database name

database name

table name

table name

Flink TypeClickHouse Type
CHARString
VARCHARString / UUID / IP
STRINGString / Enum
BOOLEANUInt8
BYTESFixedString
TINYINTInt8
SMALLINTInt16 / UInt8
INTEGERInt32 / UInt16 / Interval
BIGINTInt64 / UInt32
FLOATFloat32
DOUBLEFloat64
DECIMALDecimal / Int128 / Int256 / UInt64 / UInt128 / UInt256
DATEDate
TIMEDateTime
TIMESTAMPDateTime
TIMESTAMP_LTZDateTime
INTERVAL_YEAR_MONTHInt32
INTERVAL_DAY_TIMEInt64
ARRAYArray
MAPMap
ROW
MULTISET
RAW

Compatibility, Deprecation, and Migration Plan

  • Introduce ClickHouse  connector for users
  • It will be a new feature, so we needn't phase out the older behavior.
  • we don't need special migration tools

Test Plan

We could add unit test cases and integration test cases based on testcontainers.

Rejected Alternatives


Related References

  1. BalancedClickhouseDataSource.java
  2. https://clickhouse.com/docs/en/engines/table-engines/
  3. https://clickhouse.com/docs/en/sql-reference/data-types/


  • No labels