Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Flink Doris Connector does not introduce any new interfaces or any existing interfaces that will be removed or changed.

Proposed Changes

Overall design

1. Source

There are two types of reading from DorisSource:

1.1. SCAN

Batch reading of Doris data is currently a bounded stream, usually used for data synchronization or joint analysis with other data sources.
1. First, the query will be spliced according to the query and sent to Doris to obtain the query plan.
2. The above Response will return the Tablet and BE node information where the query is located.
3. Use taskmanager to query specific tablet information concurrently

1.2. LOOKUP JOIN

For the scenario where the dimension table is in Doris, lookup join is performed, and JDBC is mainly used for querying.

2. Sink

Writing on the Doris side is mainly done through the Stream Load API , At the same time, Doris Sink will provide two writing methods

2.1. Streaming writing

When the Sink operator receives data, it will initiate a Stream Load request and maintain the http link until the Checkpoint ends to complete the data writing.

Exactly-Once

Stream Load provides two-phase commit api, refer to https://github.com/apache/doris/issues/7141
Combined with Stream Load's two-phase commit, end-to-end data consistency can be achieved based on Flink's two-phase commit.

Take Kafka to Doris as an example:

Image Added


2.2. Save batch writing

Streaming writing is submitted based on the checkpoint method and is strongly bound to the checkpoint, that is, the data visibility is the checkpoint interval. However, in some scenarios, the delay of user data needs to be decoupled from the checkpoint interval.

Batch writing is to cache data to the Sink, trigger writing based on thresholds such as the number of records, or periodically write the data in the cache to Doris.

Note:that batch writing provides at-least-once semantics and does not guarantee Exactly-Once semantics. However, it can be combined with Doris' primary key table to achieve Exactly-Once.


Configuration

1. General options

KeyDefault ValueRequiredComment
fenodes--YDoris FE http address, multiple addresses are supported, separated by commas
benodes--NDoris BE http address, multiple addresses are supported, separated by commas. refer to #187
jdbc-url--Njdbc connection information, such as: jdbc:mysql://127.0.0.1:9030
table.identifier--YDoris table name, such as: db.tbl
username--Yusername to access Doris
password--YPassword to access Doris
auto-redirectfalseNWhether to redirect StreamLoad requests. After being turned on, StreamLoad will be written through FE, and BE information will no longer be displayed. At the same time, it can also be written to SelectDB Cloud by turning on this parameter.
doris.request.retries3NNumber of retries to send requests to Doris
doris.request.connect.timeout.ms30000NConnection timeout for sending requests to Doris
doris.request.read.timeout.ms30000NRead timeout for sending requests to Doris


2. Source options

KeyDefault ValueRequiredComment
doris.request.query.timeout.s3600NThe timeout time for querying Doris, the default value is 1 hour, -1 means no timeout limit
doris.request.tablet.sizeInteger. MAX_VALUENThe number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated. This improves the parallelism on the Flink side, but at the same time puts more pressure on Doris.
doris.batch.size1024NThe maximum number of rows to read data from BE at a time. Increasing this value reduces the number of connections established between Flink and Doris. Thereby reducing the additional time overhead caused by network delay.
doris.exec.mem.limit2147483648NMemory limit for a single query. The default is 2GB, in bytes
doris.deserialize.arrow.asyncFALSENWhether to support asynchronous conversion of Arrow format to RowBatch needed for flink-doris-connector iterations
doris.deserialize.queue.size64NAsynchronous conversion of internal processing queue in Arrow format, effective when doris.deserialize.arrow.async is true
doris.read.field--NRead the list of column names of the Doris table, separated by commas
doris.filter.query--NThe expression to filter the read data, this expression is transparently passed to Doris. Doris uses this expression to complete source-side data filtering. For example age=18.

3. Lookup Join options

KeyDefault ValueRequiredComment
lookup.cache.max-rows-1NThe maximum number of rows in the lookup cache, the default value is -1, and the cache is not enabled
lookup.cache.ttl10sNThe maximum time of lookup cache, the default is 10s
lookup.max-retries1NThe number of retries after a lookup query fails
lookup.jdbc.asyncfalseNWhether to enable asynchronous lookup, the default is false
lookup.jdbc.read.batch.size128NUnder asynchronous lookup, the maximum batch size for each query
lookup.jdbc.read.batch.queue-size256NThe size of the intermediate buffer queue during asynchronous lookup
lookup.jdbc.read.thread-size3NThe number of jdbc threads for lookup in each task

4. Sink options

KeyDefault ValueRequiredComment
sink.label-prefix--YThe label prefix used by Stream load import. In the 2pc scenario, global uniqueness is required to ensure Flink's EOS semantics.
sink.properties.*--NImport parameters for Stream Load.
For example: 'sink.properties.column_separator' = ', ' defines column delimiters, 'sink.properties.escape_delimiters' = 'true' special characters as delimiters, '\x01' will be converted to binary 0x01

JSON format import
'sink.properties.format' = 'json' 'sink.properties. read_json_by_line' = 'true'
Detailed parameters refer to here.
sink.enable-deleteTRUENWhether to enable delete. This option requires the Doris table to enable the batch delete function (Doris 0.15+ version is enabled by default), and only supports the Unique model.
sink.enable-2pcTRUENWhether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to here.
sink.buffer-size1MBNThe size of the write data cache buffer, in bytes. It is not recommended to modify, the default configuration is enough
sink.buffer-count3NThe number of write data buffers. It is not recommended to modify, the default configuration is enough
sink.max-retries3NMaximum number of retries after Commit failure, default 3
sink.use-cachefalseNIn case of an exception, whether to use the memory cache for recovery. When enabled, the data during the Checkpoint period will be retained in the cache.
sink.enable.batch-modefalseNWhether to use the batch mode to write to Doris. After it is enabled, the writing timing does not depend on Checkpoint. The writing is controlled through the sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval parameter. Enter the opportunity.
After being turned on at the same time, Exactly-once semantics will not be guaranteed. Uniq model can be used to achieve idempotence.
sink.flush.queue-size2NIn batch mode, the cached column size.
sink.buffer-flush.max-rows50000NIn batch mode, the maximum number of data rows written in a single batch.
sink.buffer-flush.max-bytes10MBNIn batch mode, the maximum number of bytes written in a single batch.
sink.buffer-flush.interval10sNIn batch mode, the interval for asynchronously refreshing the cache
sink.ignore.update-beforetrueNWhether to ignore the update-before event, ignored by default.


Sample Code

1.DataStream API

Doris Source

Code Block
DorisOptions.Builder builder = DorisOptions.builder()
        .setFenodes("FE_IP:HTTP_PORT")
        .setTableIdentifier("db.table")
        .setUsername("root")
        .setPassword("password");

DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
        .setDorisOptions(builder.build())
        .setDorisReadOptions(DorisReadOptions.builder().build())
        .setDeserializer(new SimpleListDeserializationSchema())
        .build();

env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();

Doris Sink

Code Block
// enable checkpoint
env.enableCheckpointing(10000);
// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:HTTP_PORT")
        .setTableIdentifier("db.table")
        .setUsername("root")
        .setPassword("password");


Properties properties = new Properties();
DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
                .setDeletable(false)
                .setStreamLoadProp(properties); 

builder.setDorisReadOptions(DorisReadOptions.builder().build())
        .setDorisExecutionOptions(executionBuilder.build())
        .setSerializer(new SimpleStringSerializer()) //serialize according to string 
        .setDorisOptions(dorisBuilder.build());

//mock csv string source
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("doris",1));
DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);
source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1)
      .sinkTo(builder.build());



2.FlinkSQL

Code Block
languagesql
-- doris source
CREATE TABLE flink_doris_source (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = 'FE_IP:HTTP_PORT',
      'table.identifier' = 'database.table',
      'username' = 'root',
      'password' = 'password'
);

 CREATE TABLE flink_doris_sink (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = 'FE_IP:HTTP_PORT',
      'table.identifier' = 'db.table',
      'username' = 'root',
      'password' = 'password',
      'sink.label-prefix' = 'doris_label'
);

INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
 




Datatype Mapping

Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
STRINGSTRING
ARRAYARRAY
MAPMAP
STRUCTROW
BitmapUnsupported datatype
HLLUnsupported datatype



Limitations

1. Currently, it is not possible to read Doris' data in real time, and the only way is to use bounded streaming.

Compatibility, Deprecation, and Migration Plan

...

2. Considering that Flink Doris Connector is is already organized in Apache, I think we can just transfer the ownership of the project and rename(may need to rename the project to flink-connector-doris, refer to flink-connector-jdbc, etc.) it. This avoids the need to recreate the project and push code.

Test Plan

1. Unit test cases to test methods in Flink Doris Connector.
2. The E2E test suite will be added in flink-connector-doris-e2e-tests.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.