...
Stream Load provides two-phase commit api, refer to https://github.com/apache/doris/issues/7141
Combined with Stream Load's two-phase commit, end-to-end data consistency can be achieved based on Flink's two-phase commit.
Take Kafka to Doris as an example:
2.2. Save batch writing
Streaming writing is submitted based on the checkpoint method and is strongly bound to the checkpoint, that is, the data visibility is the checkpoint interval. However, in some scenarios, the delay of user data needs to be decoupled from the checkpoint interval.
...
Key | Default Value | Required | Comment |
---|---|---|---|
sink.label-prefix | -- | Y | The label prefix used by Stream load import. In the 2pc scenario, global uniqueness is required to ensure Flink's EOS semantics. |
sink.properties.* | -- | N | Import parameters for Stream Load. For example: 'sink.properties.column_separator' = ', ' defines column delimiters, 'sink.properties.escape_delimiters' = 'true' special characters as delimiters, '\x01' will be converted to binary 0x01 JSON format import 'sink.properties.format' = 'json' 'sink.properties. read_json_by_line' = 'true' Detailed parameters refer to here. |
sink.enable-delete | TRUE | N | Whether to enable delete. This option requires the Doris table to enable the batch delete function (Doris 0.15+ version is enabled by default), and only supports the Unique model. |
sink.enable-2pc | TRUE | N | Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to here. |
sink.buffer-size | 1MB | N | The size of the write data cache buffer, in bytes. It is not recommended to modify, the default configuration is enough |
sink.buffer-count | 3 | N | The number of write data buffers. It is not recommended to modify, the default configuration is enough |
sink.max-retries | 3 | N | Maximum number of retries after Commit failure, default 3 |
sink.use-cache | false | N | In case of an exception, whether to use the memory cache for recovery. When enabled, the data during the Checkpoint period will be retained in the cache. |
sink.enable.batch-mode | false | N | Whether to use the batch mode to write to Doris. After it is enabled, the writing timing does not depend on Checkpoint. The writing is controlled through the sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval parameter. Enter the opportunity. After being turned on at the same time, Exactly-once semantics will not be guaranteed. Uniq model can be used to achieve idempotence. |
sink.flush.queue-size | 2 | N | In batch mode, the cached column size. |
sink.buffer-flush.max-rows | 50000 | N | In batch mode, the maximum number of data rows written in a single batch. |
sink.buffer-flush.max-bytes | 10MB | N | In batch mode, the maximum number of bytes written in a single batch. |
sink.buffer-flush.interval | 10s | N | In batch mode, the interval for asynchronously refreshing the cache |
sink.ignore.update-before | true | N | Whether to ignore the update-before event, ignored by default. |
Sample Code
1.DataStream API
2.FlinkSQL
Datatype Mapping
Doris Source
Code Block |
---|
DorisOptions.Builder builder = DorisOptions.builder()
.setFenodes("FE_IP:HTTP_PORT")
.setTableIdentifier("db.table")
.setUsername("root")
.setPassword("password");
DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
.setDorisOptions(builder.build())
.setDorisReadOptions(DorisReadOptions.builder().build())
.setDeserializer(new SimpleListDeserializationSchema())
.build();
env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print(); |
Doris Sink
Code Block |
---|
// enable checkpoint
env.enableCheckpointing(10000);
// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:HTTP_PORT")
.setTableIdentifier("db.table")
.setUsername("root")
.setPassword("password");
Properties properties = new Properties();
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
.setDeletable(false)
.setStreamLoadProp(properties);
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(new SimpleStringSerializer()) //serialize according to string
.setDorisOptions(dorisBuilder.build());
//mock csv string source
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("doris",1));
DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);
source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1)
.sinkTo(builder.build()); |
2.FlinkSQL
Code Block | ||
---|---|---|
| ||
-- doris source
CREATE TABLE flink_doris_source (
name STRING,
age INT,
price DECIMAL(5,2),
sale DOUBLE
)
WITH (
'connector' = 'doris',
'fenodes' = 'FE_IP:HTTP_PORT',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = 'password'
);
CREATE TABLE flink_doris_sink (
name STRING,
age INT,
price DECIMAL(5,2),
sale DOUBLE
)
WITH (
'connector' = 'doris',
'fenodes' = 'FE_IP:HTTP_PORT',
'table.identifier' = 'db.table',
'username' = 'root',
'password' = 'password',
'sink.label-prefix' = 'doris_label'
);
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
|
Datatype Mapping
Doris Type | Flink Type |
---|---|
NULL_TYPE | NULL |
Doris Type | Flink Type |
NULL_TYPE | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
CHAR | STRING |
LARGEINT | STRING |
VARCHAR | STRING |
STRING | STRING |
Bitmap | Unsupported datatype |
HLL | Unsupported datatype |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
Bitmap | Unsupported datatype |
HLL | Unsupported datatype |
Limitations
1. Currently, it is not possible to read Doris' data in real time, and the only way is to use bounded streaming.
Compatibility, Deprecation, and Migration Plan
...
2. Considering that Flink Doris Connector is already organized in Apache, I think we can just transfer the ownership of the project and rename is already organized in Apache, I think we can just transfer the ownership of the project and rename(may need to rename the project to flink-connector-doris, refer to flink-connector-jdbc, etc.) it. This avoids the need to recreate the project and push code.
Test Plan
1. Unit test cases to test methods in Flink Doris Connector.
2. The E2E test suite will be added in flink-connector-doris-e2e-tests.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.