Status
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, Flink can directly write or read ClickHouse through flink connector JDBC, but it is not flexible and easy to use, especially in the scenario of writing data to ClickHouse by FlinkSQL. The ClickHouse-JDBC project group implemented a BalancedClickhouseDataSource component that adapts to the ClickHouse cluster, and has also been tested by a good production environment, which can solve well some problems of insufficient flexibility of flex connector JDBC.
Therefore, we plan to introduce the ClickHouse DataStream Connector & Flink ClickHouse SQL Connector & Catalog, and do the function adaptation of BalancedClickhouseDataSource, so as to make the integration of Flink and ClickHouse more convenient and easy to use.
Proposed Changes
- Introduce ClickHouse DataStream Connector.
- Introduce ClickHouse SQL Connector.
- Introduce ClickHouse Catalog.
Things to confirm
About ClickHouseDynamicTableSource
It should implement :
- ScanTableSource:
- LookupTableSource:
SupportsLimitPushDown: To avoid scenarios with large amounts of data
About ClickHouseDynamicTableSink
It should implement :
DynamicTableSink
The following scenarios also need to be considered:
- Support writing distributed and local tables
- write into a distributed table
- We don't need to care about the load balancing of ClickHouse table.
- In the case of asynchronous writing to distributed tables, data may be lost. When using the synchronous write mechanism, there may be a certain delay.
- And Writing data into a distributed table will cause the pressure to the network & io loading of ClickHouse cluster. This limitation has nothing to do with how to implement the connector, which is the limitation of ClickHouse itself.
- write into local tables
- The write frequency is controlled according to the data batch size and batch interval to achieve a balance between part merge pressure and data real-time for ClickHouse.
- The BalancedClickhouseDataSource can ensure theoretically the load balance of each ClickHouse instance through random routing, but it is only detect the instance through periodic Ping, shields the currently inaccessible instances, and has no failover. Therefore, once we try to write to the failed node, we will lose data. We can introduce a retry mechanism that can configure parameters to minimize this situation.
- Enhance the route strategies to ClickHouse instance. e.g. round robin, shard key hash.
- write into a distributed table
- ClickHouse does not support the transaction feature, so we do not need to consider the consistency of data writing
Related Classes
ClickHouseDynamicTableSource
@Internal public class ClickHouseDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsLimitPushDown { private final Scheme tableSchema; private final ClickHouseOptions options; @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) { return null; } @Override public ChangelogMode getChangelogMode() { return null; } @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext){ } @Override public DynamicTableSource copy() { } @Override public String asSummaryString() { return null; } @Override public void applyLimit(long limit) { ... } }
ClickHouseDynamicTableSink
@Internal public class ClickHouseDynamicTableSink implements DynamicTableSink { private final Scheme tableSchema; private final ClickHouseOptions options; @Override public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { return null; } @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { return OutputFormatProvider.of(...); } @Override public DynamicTableSink copy() { return new ClickHouseDynamicTableSink(...); } @Override public String asSummaryString() { return ""; } }
ClickHouseOptions
private String url;// server url information. private final String username; private final String password; private final String databaseName; private final String tableName; private final int batchSize; private final Duration flushInterval; private final int maxRetries; private final boolean writeLocal; // write into local tables or distributed tables. private final String partitionStrategy; // for data routing. private final String partitionKey; private final boolean ignoreDelete; private ClickHouseDialect dialect;
ClickHouseCatalog
@Internal public class ClickHouseCatalog extends AbstractCatalog { public ClickHouseCatalog( String catalogName, @Nullable String defaultDatabase, String baseUrl, String username, String password, Map<String, String> properties) { super(...); //... } @Override public void open() throws CatalogException { } @Override public synchronized void close() throws CatalogException { } @Override public Optional<Factory> getFactory() { return Optional.of(...); } // ------------- databases ------------- @Override public synchronized List<String> listDatabases() throws CatalogException {} @Override public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { } @Override public boolean databaseExists(String databaseName) throws CatalogException { } // ------------- tables ------------- @Override public synchronized List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException { } @Override public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { } @Override public boolean tableExists(ObjectPath tablePath) throws CatalogException {} // -------other unsupport methods placeholders. ... }
Using the ClickHouseCatalog
Table API
// java tEnv.registerCatalog(name, new ClickHouseCatalog(...))
SQL
DROP ck_catalog IF EXISTS ck_catalog; CREATE CATALOG ck_catalog WITH( 'type' = '', 'default-database' = '...', 'username' = '...', 'password' = '...', 'url' = '...' [, '' = ''] ); USE CATALOG ck_catalog;
SQL CLI yaml configs
catalogs: - name: ... type: clickhouse username:..., password:..., default_database:..., ...
Using the ClickHouse SQL connector
DROP TABLE if exists t1; CREATE TABLE t1 ( `pid` BIGINT, `bigint_col` BIGINT, `int_col` INTEGER, `str_col` STRING, `d_col` DOUBLE, `list_col` ARRAY<STRING>, `map_col` Map<STRING, BIGINT>, PRIMARY KEY (`pid`) NOT ENFORCED ) WITH ( 'connector' = 'clickhouse', -- ident 'url' = 'clickhouse://xxxxx:port[, ...]', list or single host information. 'username' = '', 'password' = '', 'database-name' = 'ckd1', 'table-name' = 'ckt1', -----source options----- 'lookup.cache.max-rows' = '100', -- cache capacity threshold, the cache for lazy loading. 'lookup.cache.ttl' = '10', -- time to live setting for the elements located in the cache. 'lookup.max-retries' = '3', -- retry setting. -- ... -----sink options------ 'sink.batch-size' = '1000', -- batch size threshold 'sink.flush-interval' = '1 min', -- time interval with unit. 'sink.max-retries' = '3', 'sink.partition-strategy' = 'balanced', --data routing strategy 'sink.write-local' = 'false', -- write into a local table or a distributed table. 'sink.ignore-delete' = 'true', -- .... );
Versions
We will provide public interface in the section about frequent version changes of Clickhouse to facilitate users to overwrite.
Flink-ClickHouse Metaspace Mapping
Flink Catalog Metaspace Structure | ClickHouse Metaspace Structure |
catalog name (defined in Flink only) | n/a |
database name | database name |
table name | table name |
Flink-ClickHouse Data Type Mapping
Flink Type | ClickHouse Type |
---|---|
CHAR | String |
VARCHAR | String / UUID / IP |
STRING | String / Enum |
BOOLEAN | UInt8 |
BYTES | FixedString |
TINYINT | Int8 |
SMALLINT | Int16 / UInt8 |
INTEGER | Int32 / UInt16 / Interval |
BIGINT | Int64 / UInt32 |
FLOAT | Float32 |
DOUBLE | Float64 |
DECIMAL | Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256 |
DATE | Date |
TIME | DateTime |
TIMESTAMP | DateTime |
TIMESTAMP_LTZ | DateTime |
INTERVAL_YEAR_MONTH | Int32 |
INTERVAL_DAY_TIME | Int64 |
ARRAY | Array |
MAP | Map |
ROW | |
MULTISET | |
RAW |
Compatibility, Deprecation, and Migration Plan
- Introduce ClickHouse connector for users
- It will be a new feature, so we needn't phase out the older behavior.
- we don't need special migration tools
Test Plan
We could add unit test cases and integration test cases based on testcontainers.
Rejected Alternatives
Related References
- BalancedClickhouseDataSource.java
- https://clickhouse.com/docs/en/engines/table-engines/
- https://clickhouse.com/docs/en/sql-reference/data-types/