Status
...
Page properties | |
---|---|
|
...
...
JIRA: <TODO>
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
name | type | description |
name | String | catalog.database.tableName |
log.system | String | the log system |
log.kafka.bootstrap.servers | Map | Kafka brokers |
log.retention | Duration | how long changes log will be kept |
file.path | String | File path |
log.kafka.topic | String | topic of Kafka |
file.format | String | format for file |
bucket | Integer | bucket number |
change-tracking | Boolean | does this table tracking changes |
...
DESCRIBE TABLE EXTENDED without partition definition output above columns too except partition.
Configuration
Session Options
In every table environment, the TableConfig offers `TableEnvironment.getConfig` offers options for configuring the current session.
We put necessary configurations in the global session configuration to avoid the need for users to configure each individual table.If users need to configure a table separately, users can also configure it through options.
Key | Default | Type | Description | ||||
table-storage.log.system | kafka | String | Log system. Now only Kafka in the MVP. | ||||
table-storage.log.kafka.bootstrap.servers | (none) | MapString | Kafka brokers. eg: localhost:9092 | ||||
table-storage.log.retention | (none) | Duration | It means how long changes log will be kept. The default value is from the log system cluster. | ||||
table-storage.log.scanconsistency | fulltransactional | String | Specifies the scan startup log consistency mode for log consumertable.
| table-storage.log.pk.consistency | transactional | String |
Specifies the log consistency mode for table with primary key.
|
table-storage.log.pk.changelog-mode | upsertauto | String | Specifies the log changelog mode for table with primary key.
| ||||
table-storage.log.pkkey.key-format | json | String | Specifies the key message format of log system with primary key. | ||||
table-storage.log.format | debezium-json | String | Specifies the message format of log system. | table-storage.file.root-pathlog system. | |||
table-storage.file.path | (none) | String | Root file path. | ||||
table-storage.file.format | parquet | String | Format name for file. | ||||
table-storage.bucket | 1 | Integer | Bucket number for file and Partition number for Kafka. |
If users need to configure a table separately, users can also configure it through options without "table-storage." prefix, for example:
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE T (...) WITH ('log.consistency'='eventual'); |
Table Options
In addition to session options that can be configured individually for each table by removing the prefix, there are also some options that can be configured individually only for tables, they are the options that affect reading and writing:
Key | Default | Type | Description |
log.scan | full | String | Specifies the scan startup mode for log consumer.
|
log.scan.timestamp-mills | (none) | String | Root file path. |
table-storage.file.format | parquet | String | Format name for file. |
table-storage.bucket | 1 | Integer | Bucket number for file and Partition number for Kafka. |
Long | Optional timestamp used in case of "from-timestamp" scan mode. | ||
table-storage.change-tracking | true | Boolean | If users do not need to consume changes from the table, they can disable Change Tracking. This can reduce resource consumption. |
...
- Key:
- Without primary key: key is null.
- With primary key: key is json format by default. This is controlled by 'log.pkkey.key-format'.
- Value: Use debezium-json to storerecord with or without declaration primary key by default. This is controlled by 'log.format'.
...
- Declaring the primary key in the table definition
- 'log.pk.consistency' = 'eventual'
- 'log.pk.changelog-mode' = 'upsert' – this is the default mode for table with primary key
When using upsert mode, a normalized node is generated in downstream consuming job, which will generate update_before messages for old data based on the primary key, meaning that duplicate data will be corrected to an eventual consistent state.
...
If the user wants to see the all changes of this table or remove downstream normalized node, he/she can configure:
- 'log.pk.changelog-mode' = 'all'
This requires
- 'log.pk.consistency' = 'transactional'
- The sink query produces changes with UPDATE_BEFORE, If not, we can:
- Throws unsupported exception in the MVP
- In future, we can automatically add the normalize node before sink to generate required UPDATE_BEFORE messages
...