Status
Current state: ["Under Discussion"]
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, Flink can directly write or read ClickHouse through flink connector JDBC, but it is not flexible and easy to use, especially in the scenario of writing data to ClickHouse by FlinkSQL. The The ClickHouse-JDBC project group implemented a BalancedClickhouseDataSource component that adapts to the ClickHouse cluster, and has also been tested by a good production environment, which can solve well the problem some problems of insufficient flexibility of flex connector JDBC.
Therefore, we plan to introduce the Flink ClickHouse SQL Connector & Catalog, and do the function adaptation of BalancedClickhouseDataSource, so as to make the integration of Flink and ClickHouse more convenient and easy to use.
Things to confirm
About ClickHouseDynamicTableSource
It should implement :
- ScanTableSource:
- LookupTableSource:
SupportsLimitPushDown: To avoid scenarios with large amounts of data
About ClickHouseDynamicTableSink
It should implement :
DynamicTableSink
...
- Support writing distributed and local tables
- write into a distributed table
- We don't need to care about the load balancing of ClickHouse table.
- In the case of asynchronous writing to distributed tables, data may be lost. When using the synchronous write mechanism, there may be a certain delay.
- And Writing data into a distributed table will cause the pressure to the network & io loading of ClickHouse cluster. This limitation has nothing to do with how to implement the connector, which is the limitation of ClickHouse itself.
- write into local tables
- The write frequency is controlled according to the data batch size and batch interval to achieve a balance between part merge pressure and data real-time for ClickHouse.
- The BalancedClickhouseDataSource can ensure theoretically the load balance of each ClickHouse instance through random routing, but it is only detect the instance through periodic Ping, shields the currently inaccessible instances, and has no failover. Therefore, once we try to write to the failed node, we will lose data. We can introduce a retry mechanism that can configure parameters to minimize this situation.
- Enhance the route strategies to ClickHouse instance. e.g. round robin, shard key hash.
- write into a distributed table
- ClickHouse does not support the transaction feature, so we do not need to consider the consistency of data writing
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
- DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
- Classes marked with the @Public annotation
- On-disk binary formats, such as checkpoints/savepoints
- User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
- Configuration settings
- Exposed monitoring information
Proposed Changes
...
Proposed Changes
- Introduce ClickHouse SQL Connector.
- Introduce ClickHouse Catalog.
Related Classes
ClickHouseDynamicTableSource
Code Block | ||||
---|---|---|---|---|
| ||||
public class ClickHouseDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsLimitPushDown {
private final Scheme tableSchema;
private final ClickHouseOptions options;
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
return null;
}
@Override
public ChangelogMode getChangelogMode() {
return null;
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext){
}
@Override
public DynamicTableSource copy() {
}
@Override
public String asSummaryString() {
return null;
}
@Override
public void applyLimit(long limit) {
...
}
} |
ClickHouseDynamicTableSink
Code Block | ||
---|---|---|
| ||
@Internal
public class ClickHouseDynamicTableSink implements DynamicTableSink {
private final Scheme tableSchema;
private final ClickHouseOptions options;
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return null;
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return OutputFormatProvider.of(...);
}
@Override
public DynamicTableSink copy() {
return new ClickHouseDynamicTableSink(...);
}
@Override
public String asSummaryString() {
return "";
}
} |
ClickHouseOptions
Code Block | ||
---|---|---|
| ||
private String url;// server url information.
private final String username;
private final String password;
private final String databaseName;
private final String tableName;
private final int batchSize;
private final Duration flushInterval;
private final int maxRetries;
private final boolean writeLocal; // write into local tables or distributed tables.
private final String partitionStrategy; // for data routing.
private final String partitionKey;
private final boolean ignoreDelete;
private ClickHouseDialect dialect;
|
ClickHouseCatalog
Code Block | ||
---|---|---|
| ||
@Internal
public class ClickHouseCatalog extends AbstractCatalog {
public ClickHouseCatalog(
String catalogName,
@Nullable String defaultDatabase,
String baseUrl,
String username,
String password,
Map<String, String> properties) {
super(...);
//...
}
@Override
public void open() throws CatalogException {
}
@Override
public synchronized void close() throws CatalogException {
}
@Override
public Optional<Factory> getFactory() {
return Optional.of(...);
}
// ------------- databases -------------
@Override
public synchronized List<String> listDatabases() throws CatalogException {}
@Override
public CatalogDatabase getDatabase(String databaseName)
throws DatabaseNotExistException, CatalogException { }
@Override
public boolean databaseExists(String databaseName) throws CatalogException { }
// ------------- tables -------------
@Override
public synchronized List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException { }
@Override
public CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException { }
@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {}
// -------other unsupport methods placeholders.
...
}
|
Using the ClickHousecatalog
Table API
Code Block |
---|
// java
tEnv.registerCatalog(name, new ClickHouseCatalog(...)) |
SQL
Code Block | ||
---|---|---|
| ||
DROP ck_catalog IF EXISTS ck_catalog;
CREATE CATALOG ck_catalog WITH(
'type' = '',
'default-database' = '...',
'username' = '...',
'password' = '...',
'url' = '...'
[, '' = '']
);
USE CATALOG ck_catalog;
|
SQL CLI yaml configs
Code Block |
---|
catalogs:
- name: ...
type: clickhouse
username:...,
password:...,
default_database:...,
... |
Using the ClickHouse SQL connector
Code Block | ||
---|---|---|
| ||
DROP TABLE if exists t1;
CREATE TABLE t1 (
`pid` BIGINT,
`bigint_col` BIGINT,
`int_col` INTEGER,
`str_col` STRING,
`d_col` DOUBLE,
`list_col` ARRAY<STRING>,
`map_col` Map<STRING, BIGINT>,
PRIMARY KEY (`pid`) NOT ENFORCED
) WITH (
'connector' = 'clickhouse', -- ident
'url' = 'clickhouse://xxxxx:port[, ...]', list or single host information.
'username' = '',
'password' = '',
'database-name' = 'ckd1',
'table-name' = 'ckt1',
-----source options-----
'lookup.cache.max-rows' = '100', -- cache capacity threshold, the cache for lazy loading.
'lookup.cache.ttl' = '10', -- time to live setting for the elements located in the cache.
'lookup.max-retries' = '3', -- retry setting.
-- ...
-----sink options------
'sink.batch-size' = '1000', -- batch size threshold
'sink.flush-interval' = '1 min', -- time interval with unit.
'sink.max-retries' = '3',
'sink.partition-strategy' = 'balanced', --data routing strategy
'sink.write-local' = 'false', -- write into a local table or a distributed table.
'sink.ignore-delete' = 'true',
-- ....
); |
Versions
We will provide public interface in the section about frequent version changes of Clickhouse to facilitate users to overwrite.
Flink-ClickHouse Metaspace Mapping
Flink Catalog Metaspace Structure | ClickHouse Metaspace Structure |
catalog name (defined in Flink only) | n/a |
database name | database name |
table name | table name |
Flink-ClickHouse Data Type Mapping
Flink Type | ClickHouse Type |
---|---|
CHAR | String |
VARCHAR | String / UUID / IP |
STRING | String / Enum |
BOOLEAN | UInt8 |
BYTES | FixedString |
TINYINT | Int8 |
SMALLINT | Int16 / UInt8 |
INTEGER | Int32 / UInt16 / Interval |
BIGINT | Int64 / UInt32 |
FLOAT | Float32 |
DOUBLE | Float64 |
DECIMAL | Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256 |
DATE | Date |
TIME | DateTime |
TIMESTAMP | DateTime |
TIMESTAMP_LTZ | DateTime |
INTERVAL_YEAR_MONTH | Int32 |
INTERVAL_DAY_TIME | Int64 |
ARRAY | Array |
MAP | Map |
ROW | |
MULTISET | |
RAW |
...
Compatibility, Deprecation, and Migration Plan
- Introduce ClickHouse SQL connector for users
- It will be a new feature, so we needn't phase out the older behavior.
- we don't need special migration tools
Test Plan
We could add unit test cases and integration test cases based on testcontainers.
Rejected Alternatives
Related References
- BalancedClickhouseDataSource.java
- https://clickhouse.com/docs/en/engines/table-engines/
- https://clickhouse.com/docs/en/sql-reference/data-types/