Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

JIRA: <TODO>

...

Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25152

Release1.15


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagesql
titleSQL
DESCRIBE TABLE EXTENDED [catalog_name.][db_name.]table_name [PARTITION partition_spec]

DESCRIBE DETAIL TABLE EXTENDED output:

name

type

description

name

String

catalog.database.tableName

log.systemStringthe log system

log.kafka.bootstrap.servers

Map

Kafka brokers

log.retention

Duration

how long changes log will be kept

file.path

String

File path

log.kafka.topic

String

topic of Kafka

watermark

Timestamp

Watermark of the latest written data (If table defines watermark)

file.format

String

format for file

bucket

Integer

bucket number

change-tracking

Boolean

does this table tracking changes

...

name

type

description

partition

String

partition spec

file.path

String

path of this partition

last-modified

Timestamp

last modified time

num-files

Integer

file number

DESCRIBE TABLE EXTENDED without partition definition output above columns too except partition.

Configuration

Session Options

In In every table environment, the TableConfig offers `TableEnvironment.getConfig` offers options for configuring the current session.

We put necessary configurations in the global session configuration to avoid the need for users to configure each individual table.

If users need to configure a table separately, users can also configure it through options.

Key

Default

Type

Description

table-storage.log.systemkafkaStringLog system. Now only Kafka in the MVP.

table-storage.log.kafka.bootstrap.servers

(none)

MapString

Kafka brokers. eg: localhost:9092

table-storage.log.retention

(none)

Duration

It means how long changes log will be kept. The default value is from the log system cluster.

table-storage.log.scanconsistencyfulltransactionalString

Specifies the scan startup log consistency mode for log consumertable.

  • transactional: only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval
  • eventual: Immediate data visibility, you may see some intermediate states, but eventually the right results will be produced, only works in table with primary key
  • full: Performs a snapshot on the table upon first startup, and continue to read the latest changes. (Using HybridSource, the switching between snapshot and changes is exactly-once consistency because we store the offset of the corresponding log to snapshot when writing data)
  • latest: Start from the latest.
  • from-timestamp: Start from user-supplied timestamp.
table-storage.log.pk.consistencychangelog-modeautotransactionalString

Specifies the log consistency mode changelog mode for table.

  • auto: upsert for table with primary key, all for table without primary key.
  • transactionalupsert: only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval
  • eventual: Immediate data visibility, you may see some intermediate states, but eventually the right results will be produced
table-storage.
  • log
.pk.changelog-mode
upsertString

Specifies the log changelog mode for table with primary key.

  • upsert: the log system does not store the UPDATE_BEFORE changes, the log consumed job will automatically add the normalized node, relying on the state to generate the required update_before.
  • all: the log system stores all changes including UPDATE_BEFORE
table-storage.log.pkkey.key-formatjsonStringSpecifies the key message format of log system with primary key.
table-storage.log.formatdebezium-jsonStringSpecifies the message format of log system.

table-storage.file.root-path

(none)

String

Root file path.

table-storage.file.format

parquet

String

Format name for file.

table-storage.bucket

1

Integer

Bucket number for file and Partition number for Kafka.

for file.

table-storage.bucket

1

Integer

Bucket number for file and Partition number for Kafka.

If users need to configure a table separately, users can also configure it through options without "table-storage." prefix, for example:

Code Block
languagesql
titleSQL
CREATE TABLE T (...) WITH ('log.consistency'='eventual');

Table Options

In addition to session options that can be configured individually for each table by removing the prefix, there are also some options that can be configured individually only for tables, they are the options that affect reading and writing:

Key

Default

Type

Description

log.scanfullString

Specifies the scan startup mode for log consumer.

  • full: Performs a snapshot on the table upon first startup, and continue to read the latest changes. (Using HybridSource, the switching between snapshot and changes is exactly-once consistency because we store the offset of the corresponding log to snapshot when writing data)
  • latest: Start from the latest.
  • from-timestamp: Start from user-supplied timestamp.
log.scan.timestamp-mills(none)LongOptional timestamp used in case of "from-timestamp" scan mode.
table-storage.change-trackingtrueBooleanIf users do not need to consume changes from the table, they can disable Change Tracking. This can reduce resource consumption.

...

We need an interface to discover the managed table factory implementation when there is no "connector=..." optionfor managed table:

Code Block
languagejava
titleInterface
/**
 * Base interface for configuring a managed dynamic table connector. The managed table factory is
 * used when there is no {@link FactoryUtil#CONNECTOR} option.
 */
@Internal
public interface ManagedTableFactory extends DynamicTableFactory {

    @Override
    default String factoryIdentifier() {
        return "";
    }

    /**
     * Enrich options from catalog and session information.
     *
     * @return new options of this table.
     */
    Map<String, String> enrichOptions(Context context);

    /** Notifies the listener that a table creation occurred. */
    void onCreateTable(Context context);

    /** Notifies the listener that a table drop occurred. */
    void onDropTable(Context context);
}

...

  • Key: 
    • Without primary key: key is null.
    • With primary key: key is json format by default. This is controlled by 'log.pkkey.key-format'.
  • Value: Use debezium-json to storerecord with or without declaration primary key by default. This is controlled by 'log.format'.

...

  • Declaring the primary key in the table definition
  • 'log.pk.consistency' = 'eventual'
  • 'log.pk.changelog-mode' = 'upsert' – this is the default mode for table with primary key

When using upsert mode, a normalized node is generated in downstream consuming job, which will generate update_before messages for old data based on the primary key, meaning that duplicate data will be corrected to an eventual consistent state.

...

If the user wants to see the all changes of this table or remove downstream normalized node, he/she can configure:

  • 'log.pk.changelog-mode' = 'all'

This requires

  • 'log.pk.consistency' = 'transactional'
  • The sink query produces changes with UPDATE_BEFORE, If not, we can:
    • Throws unsupported exception in the MVP
    • In future, we can automatically add the normalize node before sink to generate required UPDATE_BEFORE messages

...