Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this case, we need to close the changes tracking of this writing, batch job will not produce changes to downstream stream jobs. (re-processing)

Code Block
languagesql
titleSQL
INSERT INTO [catalog_name.][db_name.]table_name /*+ OPTIONS('change-tracking' = 'false') */  ...

...

DESCRIBE DETAIL TABLE output:

name

type

description

name

String

catalog.database.tableName

log.kafka.bootstrap.servers

Map

Kafka brokers

log.retention

Duration

how long changes log will be kept

file.path

String

File path

log.kafka.topic

String

topic of Kafka

watermark

Timestamp

Watermark of the latest written data (If table defines watermark)

file.format

String

format for file

bucket

Integer

bucket number

change-tracking

Boolean

does this table tracking changes

...

We put necessary configurations in the global configuration to avoid the need for users to configure each individual table.

However, if users need to configure a table separately, users can also configure it through options.

file.root-path

Key

Default

Type

Description

table-storage.log.systemkafkaStringLog system.

table-storage.log.kafka.properties.bootstrap.servers

(none)

Map

Kafka brokers. eg: localhost:9092

table-storage.log.retention

(none)

Duration

It means how long changes log will be kept. The default value is from the Kafka log system cluster.

table-storage.log.scan.startup.modeinitialfullStartupModeString

Specifies the scan startup mode for log consumer.

  • initialfull: Performs an initial a snapshot on the table upon first startup, and continue to read the latest changes.
  • latest: Start from the latest.
  • from-timestamp: Start from user-supplied timestamp.
table-storage.

(none)

String

Root file path.

table-storage.file.format

parquet

String

Format name for file.

log.pk.consistencytransactionalString

Specifies the log consistency mode for table with primary key.

  • transactional: only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval
  • eventual: Immediate data visibility, you may see some intermediate states, but eventually the right results will be produced
table-storage.log.pk.changelog-modeupsertString

Specifies the log changelog mode for table with primary key.

  • upsert: the log system does not store the update_before changes, the log consumed job will automatically add the normalized node, relying on the state to generate the required update_before.
  • all: the log system stores all changes including update_before, this requires
    • transactional consistency
    • the sink query produces changes with update_before, If not, we can report the error in the MVP, and later we can automatically add the normalize node before sink to generate update_before  
table-storage.log.formatdebezium-jsonStringSpecifies the message format of log system.

table-storage.file.root-path

(none)

String

Root file path.

table-storage.file.format

parquet

String

Format name for file.

table-storage.bucket

1

table-storage.bucket

1

Integer

Bucket number for file and Partition number for Kafka.

table-storage.change-trackingtrueBooleanIf users do not need to consume changes from the table, they can disable Change Tracking. This can reduce resource consumption.

...

  • Batch reading: The data is visible only after the checkpoint, the latency depends on the checkpoint interval..
  • Stream reading:
    • When the flow job fails, duplicate changes may occur, but the downstream Flink stream job will be deduplicated according to the primary key to achieve the eventual consistency.
    Stream reading:
    • Without Primary key: The data is visible only after the checkpoint, the latency depends on the checkpoint interval.
    • With Primary key: The data is immediately visible
      • When the flow job fails, duplicate changes may occur, but the downstream Flink stream job will be deduplicated according to the primary key to achieve the eventual consistency.

(see below analysis in Design chapter)

...

  • LogStore: Store the latest data, support second level streaming incremental consumption, rely on Kafka
    • For full support of Insert/Update/Delete and optional primary key definition, auto select Kafka with Debezium format or Upsert-Kafka
  • FileStore: Store latest data + historical data, provide batch Ad-Hoc analysis
    • For full support of Insert/Update/Delete and optional primary key definition, we need a flexible storage structure that supports updates and custom merges, a LSM is a good choice
    • To support high performance analysis, should be columnar file format

...

LogStore

Log storage relies on Kafkalog system. We use Kafka with Debezium-Avro and Upsert-Default we use Kafka as underlying storages.

...

Bucket in LogStore is Kafka Partition, which means the record is hashed into different Kafka partitions according to the primary key (if have) or the whole row (without primary key).

...