Status
...
Page properties | |
---|---|
|
...
...
...
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, Flink can directly write or read ClickHouse through flink connector JDBC, but it is not flexible and easy to use, especially in the scenario of writing data to ClickHouse by FlinkSQL. The ClickHouse-JDBC project group implemented a BalancedClickhouseDataSource component that adapts to the ClickHouse cluster, and has also been tested by a good production environment, which can solve well some problems of insufficient flexibility of flex connector JDBC.
Therefore, we plan to launch ClickHouse DataStream Connector & Flink ClickHouse SQL Connector & Catalog and adopt the latest Clickhouse driver Therefore, we plan to introduce the Flink ClickHouse SQL Connector & Catalog, and do the function adaptation of BalancedClickhouseDataSource, so as to make the integration of Flink and ClickHouse more convenient and easy to use.
Things to confirm
About ClickHouseDynamicTableSource
It should implement :
- ScanTableSource:
- LookupTableSource:
SupportsLimitPushDown: To avoid scenarios with large amounts of data
About ClickHouseDynamicTableSink
It should implement :
...
⚠️ Part of the code will refer to an unofficial flink-clickhouse-connector [3] implementation that is open source following the ASF 2.0 license protocol [4]
Proposed Changes
- Introduce ClickHouse DataStream Connector.
- Introduce ClickHouse SQL Connector.
- Introduce ClickHouse Catalog.
- Introduce ClickHouse Async Sink (FLIP-171)[9]
- Introduce ClickHouse LookUp (low priority, Or implement it again after producing the first stable version)
Things to confirm
About ClickHouseDynamicTableSource
It should implement :
- ScanTableSource:
- LookupTableSource:
- SupportsLimitPushDown: To avoid scenarios with large amounts of data
About ClickHouseDynamicTableSink
The following scenarios also need to be considered:
- Support writing distributed and local tables
- write into a distributed table
- We don't need to care about the load balancing of ClickHouse table.
- In the case of asynchronous writing to distributed tables, data may be lost. When using the synchronous write mechanism, there may be a certain delay.
- And Writing data into a distributed table will cause the pressure to the network & io loading of ClickHouse cluster. This limitation has nothing to do with how to implement the connector, which is the limitation of ClickHouse itself.
- write into local tables
- The write frequency is controlled according to the data batch size and batch interval to achieve a balance between part merge pressure and data real-time for ClickHouse.
- The BalancedClickhouseDataSource can ensure theoretically the load balance of each ClickHouse instance through random routing, but it is only detect the instance through periodic Ping, shields the currently inaccessible instances, and has no failover. Therefore, once we try to write to the failed node, we will lose data. We can introduce a retry mechanism that can configure parameters to minimize this situation.
- Enhance the route strategies to ClickHouse instance. e.g. round robin, shard key hash.
- write into a distributed table
- ClickHouse does not support the transaction feature, so we do not need to consider the consistency of data writing
Proposed Changes
- Introduce ClickHouse SQL Connector.
- Introduce ClickHouse Catalog.
Related Classes
...
- Enhance the route strategies to ClickHouse instance. e.g. round robin, shard key hash.
- ClickHouse does not support the transaction feature, so we do not need to consider the consistency of data writing
The BalancedClickhouseDataSource [5] described in the previous version of FLIP-202 has been integrated into the URL configuration when use client version 0.4.0 + . Users can fill in the correct URL format to ensure the load balancing of each ClickHouse instance through random routing.
Related Classes
ClickHouseDynamicTableSource
Code Block | ||||
---|---|---|---|---|
| ||||
@Internal public class ClickHouseDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsLimitPushDown { private final Scheme tableSchema; private final ClickHouseOptions options; @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) { return null; } @Override public ChangelogMode getChangelogMode() { return null; } @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext){ } @Override public DynamicTableSource copy() { } @Override public String asSummaryString() { return null; } @Override public void applyLimit(long limit) { ... } } |
ClickHouseDynamicTableSink
Code Block | ||
---|---|---|
| ||
@Internal public class ClickHouseDynamicTableSink implements DynamicTableSink { private final Scheme tableSchema; private final ClickHouseOptions options; @Override public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { return null; } @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { return OutputFormatProvider.of(...); } @Override public DynamicTableSink copy() { return new ClickHouseDynamicTableSink(...); } @Override public String asSummaryString() { return ""; } } |
ClickHouseOptions
Code Block | ||
---|---|---|
| ||
private String url;// server url information. private final String username; private final String password; private final String databaseName; private final String tableName; private final int batchSize; private final Duration flushInterval; private final int maxRetries; private final boolean writeLocal; // write into local tables or distributed tables. private final String partitionStrategy; // for data routing. private final String partitionKey; private final boolean ignoreDelete; private ClickHouseDialect dialect; |
ClickHouseCatalog
Code Block | ||
---|---|---|
| ||
@Internal public class ClickHouseCatalog extends AbstractCatalog { public ClickHouseCatalog( String catalogName, @Nullable String defaultDatabase, String baseUrl, String username, String password, Map<String, String> properties) { super(...); //... } @Override public void open() throws CatalogException { } @Override public synchronized void close() throws CatalogException { } @Override public Optional<Factory> getFactory() { return Optional.of(...); } // ------------- databases ------------- @Override public synchronized List<String> listDatabases() throws CatalogException {} @Override public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { } @Override public boolean databaseExists(String databaseName) throws CatalogException { } // ------------- tables ------------- @Override public synchronized List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException { } @Override public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { } @Override public boolean tableExists(ObjectPath tablePath) throws CatalogException {} // -------other unsupport methods placeholders. ... } |
Using the
...
ClickHouseCatalog
Table API
Code Block |
---|
// java tEnv.registerCatalog(name, new ClickHouseCatalog(...)) |
SQL
Code Block | ||
---|---|---|
| ||
DROP ck_catalog IF EXISTS ck_catalog; CREATE CATALOG ck_catalog WITH( 'type' = '', 'default-database' = '...', 'username' = '...', 'password' = '...', 'url' = '...' [, '' = ''] ); USE CATALOG ck_catalog; |
SQL CLI yaml configs
Code Block |
---|
catalogs: - name: ... type: clickhouse username:..., password:..., default_database:..., ... |
Using the ClickHouse SQL connector
Code Block | ||
---|---|---|
| ||
DROP TABLE if exists t1; CREATE TABLE t1 ( `pid` BIGINT, `bigint_col` BIGINT, `int_col` INTEGER, `str_col` STRING, `d_col` DOUBLE, `list_col` ARRAY<STRING>, `map_col` Map<STRING, BIGINT>, PRIMARY KEY (`pid`) NOT ENFORCED ) WITH ( 'connector' = 'clickhouse', -- ident 'url' = 'clickhouse://xxxxx:port[, ...]', list or single host information. 'username' = '', 'password' = '', 'database-name' = 'ckd1', 'table-name' = 'ckt1', -----source options----- 'lookup.cache.max-rows' = '100', -- cache capacity threshold, the cache for lazy loading. 'lookup.cache.ttl' = '10', -- time to live setting for the elements located in the cache. 'lookup.max-retries' = '3', -- retry setting. -- ... -----sink options------ 'sink.batch-size' = '1000', -- batch size threshold 'sink.flush-interval' = '1 min', -- time interval with unit. 'sink.max-retries' = '3', 'sink.partition-strategy' = 'balanced', --data routing strategy 'sink.write-local' = 'false', -- write into a local table or a distributed table. 'sink.ignore-delete' = 'true', -- .... ); |
Versions
We will provide public interface in the section about frequent version changes of Clickhouse to facilitate users to overwrite.
Flink-ClickHouse Metaspace Mapping
...
Flink Catalog Metaspace Structure
...
ClickHouse Metaspace Structure
...
catalog name (defined in Flink only)
...
n/a
...
database name
...
database name
...
table name
...
table name
Flink-ClickHouse Data Type Mapping
...
WITH Options
Option | Description | Type | require | Default | Remark |
url | The ClickHouse jdbc url in format clickhouse://<host>:<port>. | String | Y | - | The URL format is jdbc:clickhouse://<host>:<port>/<yourDatabaseName> If you do not write the database name, the default database will be used. |
userName | The 'username' and 'password' must both be specified if any of them is specified. | String | Y | - | |
password | The ClickHouse password. | String | Y | - | |
tableName | The ClickHouse table name | String | Y | - | |
sink.maxRetrys | The max retry times when writing records to the database failed. | Int | N | 3 | |
sink.batchSize | The max flush size, over this will flush data. | Int | N | 100 | If the number of data items in the cache reaches the value of the sink.batchSize parameter, or the waiting time exceeds sink.flushInterval, the system will automatically write the data in the cache to the ClickHouse table. |
sink.flushInterval | Over this flush interval mills, asynchronous threads will flush data. | Optional | N | 1s | |
sink.ignoreDelete | Whether to ignore delete statements. | Boolean | N | true | The parameter values are as follows:
|
sink.parallelism | Defines a custom parallelism for the sink. | Integer | N | - | Defines a custom parallelism for the sink. |
sink.shard-write-local | Boolean | N | false |
| |
sink.partition-strategy | Enum | N | default | The parameter values are as follows:
| |
sink.partition-key | default | N | 无 | When the value of "sink.partition-strategy" is "partition", the value of "sink.partition-key" is required, and can contain multiple fields, and multiple fields are separated by commas (,). | |
properties.* | This can set and pass clickhouse-jdbc configurations. | none | N | Allows the user to configure some custom parameters for the connection | |
lookup.cache | The caching strategy for this lookup table, including NONE and PARTIAL | String | N | NONE | |
lookup.partial-cache.expire-after-access | Duration to expire an entry in the cache after accessing, over this time, the oldest rows will be expired. | Duration | N | NONE | |
lookup.partial-cache.expire-after-write | Duration to expire an entry in the cache after writing, over this time, the oldest rows will be expired. | Duration | N | NONE | |
lookup.partial-cache.max-rows | The max number of rows of lookup cache, over this value, the oldest rows will be expired. | Long | N | NONE | |
lookup.partial-cache.caching-missing-key | Flag to cache missing key, true by default | Boolean | N | true | |
lookup.max-retries | The max retry times if lookup database failed. | Integer | N | 3 |
Versions
We will provide public interface in the section about frequent version changes of Clickhouse to facilitate users to overwrite.
Flink-ClickHouse Metaspace Mapping
Flink Catalog Metaspace Structure | ClickHouse Metaspace Structure |
catalog name (defined in Flink only) | n/a |
database name | database name |
table name | table name |
Flink-ClickHouse Data Type Mapping
Flink Type | ClickHouse Type |
---|---|
CHAR | String |
VARCHAR | String / UUID / IP |
STRING | String / Enum |
BOOLEAN | UInt8 / Boolean |
BYTES | FixedString |
TINYINT | Int8 |
SMALLINT | Int16 / UInt8 |
INTEGER | Int32 / UInt16 / Interval |
BIGINT | Int64 / UInt32 |
FLOAT | Float32 |
DOUBLE | Float64 |
DECIMAL | Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256 |
DATE | Date |
TIME | DateTime |
TIMESTAMP | DateTime |
TIMESTAMP_LTZ | DateTime |
INTERVAL_YEAR_MONTH | Int32 |
INTERVAL_DAY_TIME | Int64 |
ARRAY | Array |
MAP | Map |
ROW | - |
MULTISET | - |
RAW | - |
Compatibility, Deprecation, and Migration Plan
- Introduce ClickHouse connector for users
- It will be a new feature, so we needn't phase out the older behavior.
- we don't need special migration tools
Test Plan
We could add unit test cases and integration test cases based on testcontainers.
Rejected Alternatives
flink-clickhouse-connector [3] join in FLIP-202
- Relying on the unofficial JDBC package implementation, the maintenance has been stopped at present, and there are certain risks
- Since this project used to be a personal project, the code before 0.3.2 [6] has potential license issues. The Clickhouse community has subsequently rename related packages and fixed licenses
- From 0.4.0 + version is a breaking version update, the JDBC driver name has been changed to: com.clickhouse.jdbc.ClickHouseDriver [7]
- It is not recommended to use the API version lower than 0.4.0 [8], because there are some potential performance problems
- To support FLIP-171 [9] async sink, JDBC cannot be used, It only HTTP or TCP can be used async [10] .
- Unable to keep in sync with the flink version, the last adaptation was in flink 1.16.0
**** I am not an expert on clickhouse, just used it before. If there are any omissions or mistakes, please feed back to dev-email ****
Related References
- https://clickhouse.com/docs/en/engines/table-engines/
- https://clickhouse.com/docs/en/sql-reference/data-types/
- https://github.com/itinycheng/flink-connector-clickhouse
- https://github.com/ClickHouse/clickhouse-java/pull/1089
- https://github.com/ClickHouse/clickhouse-java/issues/949
- https://github.com/ClickHouse/clickhouse-java/issues/768
- https://github.com/dbeaver/dbeaver/issues/15118
- https://github.com/ClickHouse/clickhouse-java/releases/tag/v0.4.0
- https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
...
Compatibility, Deprecation, and Migration Plan
- Introduce ClickHouse SQL connector for users
- It will be a new feature, so we needn't phase out the older behavior.
- we don't need special migration tools
Test Plan
We could add unit test cases and integration test cases based on testcontainers.
Rejected Alternatives
Related References
...
...
...
...
...