Status
Current state: [Under Discussion]
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Brief Introduction to Doris
Apache Doris is a high-performance, real-time analytical database based on MPP architecture, known for its extreme speed and ease of use. It only requires a sub-second response time to return query results under massive data and can support not only high-concurrent point query scenarios but also high-throughput complex analysis scenarios. All this makes Apache Doris an ideal tool for scenarios including report analysis, ad-hoc query, unified data warehouse, and data lake query acceleration. On Apache Doris, users can build various applications, such as user behavior analysis, AB test platform, log retrieval analysis, user portrait analysis, and order analysis.
Apache Doris, formerly known as Palo, was initially created to support Baidu's ad reporting business. It was officially open-sourced in 2017 and donated by Baidu to the Apache Foundation for incubation in July 2018, where it was operated by members of the incubator project management committee under the guidance of Apache mentors. Currently, the Apache Doris community has gathered more than 500 contributors from hundreds of companies in different industries, and the number of active contributors is more than 100 per month. In June 2022, Apache Doris graduated from Apache incubator as a Top-Level Project.
Apache Doris now has a wide user base in China and around the world, and as of today, Apache Doris is used in production environments in over 2000 companies worldwide. Of the top 50 Chinese Internet companies by market capitalization (or valuation), more than 80% are long-term users of Apache Doris, including Baidu, Meituan, Xiaomi, Jingdong, Bytedance, Tencent, NetEase, Kwai, Weibo, and Ke Holdings. It is also widely used in some traditional industries such as finance, energy, manufacturing, and telecommunications.
Why is Flink Doris Connector useful?
Apache Flink is a popular stream processing framework that allows users to analyze and operate on data on streams in real time.
The Flink Doris Connector allows Flink users to seamlessly integrate Flink with Doris, allowing them to perform real-time data analysis and write the results directly to Doris.
At the same time, with the help of Flink Doris Connector, Flink users can efficiently read data in Doris and analyze it with data from other data sources.
Scenarios for using Flink Doris Connector include:
- Data synchronization: Flink Doris Connector can pull out data from Doris and synchronize it to other data sources.
- Correlation analysis: Flink Doris Connector can perform correlation analysis and query between other data sources and the data in Doris.
- Real-time writing: Flink Doris Connector can write upstream raw data or ETL-cleaned data into Doris.
- CDC database synchronization: FlinkCDC is integrated into Flink Doris Connector, which can help users provide entire database synchronization more conveniently.
Overall, the Flink Doris Connector is a powerful tool for enterprises looking to perform real-time data analysis and integrate Flink and Doris workflows. By using this connector, enterprises can get the best of both worlds, leveraging the real-time processing power of Flink with the scalability and cost-effectiveness of Doris.
Public Interfaces
Flink Doris Connector does not introduce any new interfaces or any existing interfaces that will be removed or changed.
Proposed Changes
Overall design
1. Source
There are two types of reading from DorisSource:
1.1. SCAN
Batch reading of Doris data is currently a bounded stream, usually used for data synchronization or joint analysis with other data sources.
1. First, the query will be spliced according to the query and sent to Doris to obtain the query plan.
2. The above Response will return the Tablet and BE node information where the query is located.
3. Use taskmanager to query specific tablet information concurrently
1.2. LOOKUP JOIN
For the scenario where the dimension table is in Doris, lookup join is performed, and JDBC is mainly used for querying.
2. Sink
Writing on the Doris side is mainly done through the Stream Load API , At the same time, Doris Sink will provide two writing methods
2.1. Streaming writing
When the Sink operator receives data, it will initiate a Stream Load request and maintain the http link until the Checkpoint ends to complete the data writing.
Exactly-Once
Stream Load provides two-phase commit api, refer to https://github.com/apache/doris/issues/7141
Combined with Stream Load's two-phase commit, end-to-end data consistency can be achieved based on Flink's two-phase commit.
2.2. Save batch writing
Streaming writing is submitted based on the checkpoint method and is strongly bound to the checkpoint, that is, the data visibility is the checkpoint interval. However, in some scenarios, the delay of user data needs to be decoupled from the checkpoint interval.
Batch writing is to cache data to the Sink, trigger writing based on thresholds such as the number of records, or periodically write the data in the cache to Doris.
Note:that batch writing provides at-least-once semantics and does not guarantee Exactly-Once semantics. However, it can be combined with Doris' primary key table to achieve Exactly-Once.
Configuration
1. General options
Key | Default Value | Required | Comment |
---|---|---|---|
fenodes | -- | Y | Doris FE http address, multiple addresses are supported, separated by commas |
benodes | -- | N | Doris BE http address, multiple addresses are supported, separated by commas. refer to #187 |
jdbc-url | -- | N | jdbc connection information, such as: jdbc:mysql://127.0.0.1:9030 |
table.identifier | -- | Y | Doris table name, such as: db.tbl |
username | -- | Y | username to access Doris |
password | -- | Y | Password to access Doris |
auto-redirect | false | N | Whether to redirect StreamLoad requests. After being turned on, StreamLoad will be written through FE, and BE information will no longer be displayed. At the same time, it can also be written to SelectDB Cloud by turning on this parameter. |
doris.request.retries | 3 | N | Number of retries to send requests to Doris |
doris.request.connect.timeout.ms | 30000 | N | Connection timeout for sending requests to Doris |
doris.request.read.timeout.ms | 30000 | N | Read timeout for sending requests to Doris |
2. Source options
Key | Default Value | Required | Comment |
---|---|---|---|
doris.request.query.timeout.s | 3600 | N | The timeout time for querying Doris, the default value is 1 hour, -1 means no timeout limit |
doris.request.tablet.size | Integer. MAX_VALUE | N | The number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated. This improves the parallelism on the Flink side, but at the same time puts more pressure on Doris. |
doris.batch.size | 1024 | N | The maximum number of rows to read data from BE at a time. Increasing this value reduces the number of connections established between Flink and Doris. Thereby reducing the additional time overhead caused by network delay. |
doris.exec.mem.limit | 2147483648 | N | Memory limit for a single query. The default is 2GB, in bytes |
doris.deserialize.arrow.async | FALSE | N | Whether to support asynchronous conversion of Arrow format to RowBatch needed for flink-doris-connector iterations |
doris.deserialize.queue.size | 64 | N | Asynchronous conversion of internal processing queue in Arrow format, effective when doris.deserialize.arrow.async is true |
doris.read.field | -- | N | Read the list of column names of the Doris table, separated by commas |
doris.filter.query | -- | N | The expression to filter the read data, this expression is transparently passed to Doris. Doris uses this expression to complete source-side data filtering. For example age=18. |
3. Lookup Join options
Key | Default Value | Required | Comment |
---|---|---|---|
lookup.cache.max-rows | -1 | N | The maximum number of rows in the lookup cache, the default value is -1, and the cache is not enabled |
lookup.cache.ttl | 10s | N | The maximum time of lookup cache, the default is 10s |
lookup.max-retries | 1 | N | The number of retries after a lookup query fails |
lookup.jdbc.async | false | N | Whether to enable asynchronous lookup, the default is false |
lookup.jdbc.read.batch.size | 128 | N | Under asynchronous lookup, the maximum batch size for each query |
lookup.jdbc.read.batch.queue-size | 256 | N | The size of the intermediate buffer queue during asynchronous lookup |
lookup.jdbc.read.thread-size | 3 | N | The number of jdbc threads for lookup in each task |
4. Sink options
Key | Default Value | Required | Comment |
---|---|---|---|
sink.label-prefix | -- | Y | The label prefix used by Stream load import. In the 2pc scenario, global uniqueness is required to ensure Flink's EOS semantics. |
sink.properties.* | -- | N | Import parameters for Stream Load. For example: 'sink.properties.column_separator' = ', ' defines column delimiters, 'sink.properties.escape_delimiters' = 'true' special characters as delimiters, '\x01' will be converted to binary 0x01 JSON format import 'sink.properties.format' = 'json' 'sink.properties. read_json_by_line' = 'true' Detailed parameters refer to here. |
sink.enable-delete | TRUE | N | Whether to enable delete. This option requires the Doris table to enable the batch delete function (Doris 0.15+ version is enabled by default), and only supports the Unique model. |
sink.enable-2pc | TRUE | N | Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to here. |
sink.buffer-size | 1MB | N | The size of the write data cache buffer, in bytes. It is not recommended to modify, the default configuration is enough |
sink.buffer-count | 3 | N | The number of write data buffers. It is not recommended to modify, the default configuration is enough |
sink.max-retries | 3 | N | Maximum number of retries after Commit failure, default 3 |
sink.use-cache | false | N | In case of an exception, whether to use the memory cache for recovery. When enabled, the data during the Checkpoint period will be retained in the cache. |
sink.enable.batch-mode | false | N | Whether to use the batch mode to write to Doris. After it is enabled, the writing timing does not depend on Checkpoint. The writing is controlled through the sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval parameter. Enter the opportunity. After being turned on at the same time, Exactly-once semantics will not be guaranteed. Uniq model can be used to achieve idempotence. |
sink.flush.queue-size | 2 | N | In batch mode, the cached column size. |
sink.buffer-flush.max-rows | 50000 | N | In batch mode, the maximum number of data rows written in a single batch. |
sink.buffer-flush.max-bytes | 10MB | N | In batch mode, the maximum number of bytes written in a single batch. |
sink.buffer-flush.interval | 10s | N | In batch mode, the interval for asynchronously refreshing the cache |
sink.ignore.update-before | true | N | Whether to ignore the update-before event, ignored by default. |
Sample Code
1.DataStream API
Doris Source
DorisOptions.Builder builder = DorisOptions.builder() .setFenodes("FE_IP:HTTP_PORT") .setTableIdentifier("db.table") .setUsername("root") .setPassword("password"); DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder() .setDorisOptions(builder.build()) .setDorisReadOptions(DorisReadOptions.builder().build()) .setDeserializer(new SimpleListDeserializationSchema()) .build(); env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
Doris Sink
// enable checkpoint env.enableCheckpointing(10000); // using batch mode for bounded data env.setRuntimeMode(RuntimeExecutionMode.BATCH); DorisSink.Builder<String> builder = DorisSink.builder(); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder.setFenodes("FE_IP:HTTP_PORT") .setTableIdentifier("db.table") .setUsername("root") .setPassword("password"); Properties properties = new Properties(); DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); executionBuilder.setLabelPrefix("label-doris") //streamload label prefix .setDeletable(false) .setStreamLoadProp(properties); builder.setDorisReadOptions(DorisReadOptions.builder().build()) .setDorisExecutionOptions(executionBuilder.build()) .setSerializer(new SimpleStringSerializer()) //serialize according to string .setDorisOptions(dorisBuilder.build()); //mock csv string source List<Tuple2<String, Integer>> data = new ArrayList<>(); data.add(new Tuple2<>("doris",1)); DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data); source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1) .sinkTo(builder.build());
2.FlinkSQL
-- doris source CREATE TABLE flink_doris_source ( name STRING, age INT, price DECIMAL(5,2), sale DOUBLE ) WITH ( 'connector' = 'doris', 'fenodes' = 'FE_IP:HTTP_PORT', 'table.identifier' = 'database.table', 'username' = 'root', 'password' = 'password' ); CREATE TABLE flink_doris_sink ( name STRING, age INT, price DECIMAL(5,2), sale DOUBLE ) WITH ( 'connector' = 'doris', 'fenodes' = 'FE_IP:HTTP_PORT', 'table.identifier' = 'db.table', 'username' = 'root', 'password' = 'password', 'sink.label-prefix' = 'doris_label' ); INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
Datatype Mapping
Doris Type | Flink Type |
---|---|
NULL_TYPE | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
CHAR | STRING |
LARGEINT | STRING |
VARCHAR | STRING |
STRING | STRING |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
Bitmap | Unsupported datatype |
HLL | Unsupported datatype |
Limitations
1. Currently, it is not possible to read Doris' data in real time, and the only way is to use bounded streaming.
Compatibility, Deprecation, and Migration Plan
1. Reference to Flink Doris Connecotor through DataStream will be affected (FlinkSQL will not be affected)
The current package path of Flink Doris Connector is org.apache.doris. After migrating to the Flink community, the path may become org.apache.flink.connector.doris. For the DataStream program written by the user, the job may need to be modified.
2. Considering that Flink Doris Connector is already organized in Apache, I think we can just transfer the ownership of the project and rename it. This avoids the need to recreate the project and push code.
Test Plan
1. Unit test cases to test methods in Flink Doris Connector.
2. The E2E test suite will be added in flink-connector-doris-e2e-tests.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.