...
In this case, we need to close the changes tracking of this writing, batch job will not produce changes to downstream stream jobs. (re-processing)
Code Block | ||||
---|---|---|---|---|
| ||||
INSERT INTO [catalog_name.][db_name.]table_name /*+ OPTIONS('change-tracking' = 'false') */ ... |
...
DESCRIBE DETAIL TABLE output:
name | type | description |
name | String | catalog.database.tableName |
log.kafka.bootstrap.servers | Map | Kafka brokers |
log.retention | Duration | how long changes log will be kept |
file.path | String | File path |
log.kafka.topic | String | topic of Kafka |
watermark | Timestamp | Watermark of the latest written data (If table defines watermark) |
file.format | String | format for file |
bucket | Integer | bucket number |
change-tracking | Boolean | does this table tracking changes |
...
We put necessary configurations in the global configuration to avoid the need for users to configure each individual table.
However, if users need to configure a table separately, users can also configure it through options.
Key | Default | Type | Description | ||
table-storage.log.system | kafka | String | Log system. | ||
table-storage.log.kafka.properties.bootstrap.servers | (none) | Map | Kafka brokers. eg: localhost:9092 | ||
table-storage.log.retention | (none) | Duration | It means how long changes log will be kept. The default value is from the Kafka log system cluster. | ||
table-storage.log.scan.startup.mode | initialfull | StartupModeString | Specifies the scan startup mode for log consumer.
| ||
table-storage. | file.root-path(none) | String | Root file path. | ||
table-storage.file.format | parquet | String | Format name for file. | ||
log.pk.consistency | transactional | String | Specifies the log consistency mode for table with primary key.
| ||
table-storage.log.pk.changelog-mode | upsert | String | Specifies the log changelog mode for table with primary key.
| ||
table-storage.log.format | debezium-json | String | Specifies the message format of log system. | ||
table-storage.file.root-path | (none) | String | Root file path. | ||
table-storage.file.format | parquet | String | Format name for file. | ||
table-storage.bucket | 1 | table-storage.bucket | 1 | Integer | Bucket number for file and Partition number for Kafka. |
table-storage.change-tracking | true | Boolean | If users do not need to consume changes from the table, they can disable Change Tracking. This can reduce resource consumption. |
...
- Batch reading: The data is visible only after the checkpoint, the latency depends on the checkpoint interval..
- Stream reading:
- When the flow job fails, duplicate changes may occur, but the downstream Flink stream job will be deduplicated according to the primary key to achieve the eventual consistency.
- Without Primary key: The data is visible only after the checkpoint, the latency depends on the checkpoint interval.
- With Primary key: The data is immediately visible
- When the flow job fails, duplicate changes may occur, but the downstream Flink stream job will be deduplicated according to the primary key to achieve the eventual consistency.
(see below analysis in Design chapter)
...
- LogStore: Store the latest data, support second level streaming incremental consumption, rely on Kafka
- For full support of Insert/Update/Delete and optional primary key definition, auto select Kafka with Debezium format or Upsert-Kafka
- FileStore: Store latest data + historical data, provide batch Ad-Hoc analysis
- For full support of Insert/Update/Delete and optional primary key definition, we need a flexible storage structure that supports updates and custom merges, a LSM is a good choice
- To support high performance analysis, should be columnar file format
...
LogStore
Log storage relies on Kafkalog system. We use Kafka with Debezium-Avro and Upsert-Default we use Kafka as underlying storages.
...
Bucket in LogStore is Kafka Partition, which means the record is hashed into different Kafka partitions according to the primary key (if have) or the whole row (without primary key).
...