Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

JIRA: <TODO>

...

Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25152

Release1.15


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

name

type

description

name

String

catalog.database.tableName

log.systemStringthe log system

log.kafka.bootstrap.servers

Map

Kafka brokers

log.retention

Duration

how long changes log will be kept

file.path

String

File path

log.kafka.topic

String

topic of Kafka

file.format

String

format for file

bucket

Integer

bucket number

change-tracking

Boolean

does this table tracking changes

...

Key

Default

Type

Description

table-storage.log.systemkafkaStringLog system. Now only Kafka in the MVP.

table-storage.log.kafka.bootstrap.servers

(none)

MapString

Kafka brokers. eg: localhost:9092

table-storage.log.retention

(none)

Duration

It means how long changes log will be kept. The default value is from the log system cluster.

table-storage.log.scanconsistencyfulltransactionalString

Specifies the scan startup log consistency mode for log consumertable.

  • full: Performs a snapshot on the table upon first startup, and continue to read the latest changes. (Using HybridSource, the switching between snapshot and changes is exactly-once consistency because we store the offset of the corresponding log to snapshot when writing data)
  • latest: Start from the latest.
  • from-timestamp: Start from user-supplied timestamp.
table-storage.log.consistencytransactionalString

Specifies the log consistency mode for table.

  • transactional: only the data after the checkpoint transactional: only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval
  • eventual: Immediate data visibility, you may see some intermediate states, but eventually the right results will be produced, only works in table with primary key
table-storage.log.changelog-modeupsertautoString

Specifies the log changelog mode for table with primary key.

  • auto: upsert for table with primary key, all for table without primary key.
  • upsert: the log system does not store the UPDATE_BEFORE changes, the log consumed job will automatically add the normalized node, relying on the state to generate the required update_before.
  • all: the log system stores all changes including UPDATE_BEFORE
table-storage.log.pkkey.key-formatjsonStringSpecifies the key message format of log system with primary key.
table-storage.log.formatdebezium-jsonStringSpecifies the message format of log system.

table-storage.file.path

(none)

String

Root file path.

table-storage.file.format

parquet

String

Format name for file.

table-storage.bucket

1

Integer

Bucket number for file and Partition number for Kafka.

table-storage.change-trackingtrueBoolean

If users

...

need to

...

If users need to configure a table separately, users can also configure it through options without "table-storage." prefix, for example:

...

In addition to session options that can be configured individually for each table by removing the prefix, there are also some options that can be configured individually only for tables, they are the options that affect reading and writing:

Key

Default

Type

Description

log.scanfullString

Specifies the scan startup mode for log consumer.

  • full: Performs a snapshot on the table upon first startup, and continue to read the latest changes. (Using HybridSource, the switching between snapshot and changes is exactly-once consistency because we store the offset of the corresponding log to snapshot when writing data)
  • latest: Start from the latest.
  • from-timestamp: Start from user-supplied timestamp.
log.scan.timestamp-mills(none)LongOptional timestamp used in case of "from-timestamp" scan mode.
change-trackingtrueBooleanIf users do not need to consume changes from the table, they can disable Change Tracking. This can reduce resource consumption.

Bucket

The record is hashed into different buckets according to the primary key (if have) or the whole row (without primary key):

...

  • Key: 
    • Without primary key: key is null.
    • With primary key: key is json format by default. This is controlled by 'log.pkkey.key-format'.
  • Value: Use debezium-json to storerecord with or without declaration primary key by default. This is controlled by 'log.format'.

...