Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.



Page properties

Discussion thread





Vote thread

serverASF JIRA


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


When creating a table, the corresponding underlying physical storage will be created. Very simple, it masks & abstracts the underlying technical details, no annoying options.

Limitation: When a partitioned table has a primary key, the primary key must contain the partitioned fields inside.


Code Block
DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name


Code Block
DESCRIBE TABLE EXTENDED [catalog_name.][db_name.]table_name [PARTITION partition_spec]








log.systemStringthe log system



Kafka brokers



how long changes log will be kept



File path



topic of Kafka



Watermark of the latest written data (If table defines watermark)





format for file



bucket number



does this table tracking changes







partition spec



path of this partition



last modified time



file number

DESCRIBE TABLE EXTENDED without partition definition output above columns too except partition.


Session Options

In every table environment, the TableConfig offers `TableEnvironment.getConfig` offers options for configuring the current session.

We put necessary configurations in the global session configuration to avoid the need for users to configure each individual table.

If users need to configure a table separately, users can also configure it through options.






table-storage.log.systemkafkaStringLog system. Now only Kafka in the MVP.




Kafka brokers. eg: localhost:9092




It means how long changes log will be kept. The default value is from the log system cluster.


Specifies the scan startup log consistency mode for log consumertable.

  • full: Performs a snapshot on the table upon first startup, and continue to read the latest changes. (Using HybridSource, the switching between snapshot and changes is exactly-once consistency because we store the offset of the corresponding log to snapshot when writing data)
  • latest: Start from the latest.
  • from-timestamp: Start from user-supplied timestamp.
  • transactional: only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval
  • eventual: Immediate data visibility, you may see some intermediate states, but eventually the right results will be produced, only works in table with primary key

Specifies the log consistency changelog mode for table.

  • auto: upsert for table with primary key, all for table without primary key.
  • transactionalupsert: only the data after the checkpoint can be seen by readers, the latency depends on checkpoint intervaleventual: Immediate data visibility, you may see some intermediate states, but eventually the right results will be producedthe log system does not store the UPDATE_BEFORE changes, the log consumed job will automatically add the normalized node, relying on the state to generate the required update_before.
  • all: the log system stores all changes including UPDATE_BEFORE
table-storage.log.pkkey.changelog-modeformatupsertjsonStringSpecifies the log changelog mode for table key message format of log system with primary key.
  • upsert: the log system does not store the UPDATE_BEFORE changes, the log consumed job will automatically add the normalized node, relying on the state to generate the required update_before.
  • all: the log system stores all changes including UPDATE_BEFORE
table-storage.log.formatdebezium-jsonStringSpecifies the the key message format of log system with primary key.



debezium-jsonStringSpecifies the message format of log system.




Root file path.




Format name for file.




Bucket number for file and Partition number for Kafka.



If users


need to



The record is hashed into different buckets according to the primary key (if have) or the whole row (without primary key):

Bucket is for distributed reading and writing.

Bucket and Parallelism are corresponding:

  • writing: A single bucket can only be written by a single parallelism. But one parallelism can write to multiple buckets. So the max working parallelism of the sink will not be bigger than the bucket number.
  • reading: In general, a single bucket can only be read by a single parallelism. (Optimization: if the bucket is too large, we can consider supporting concurrent batch reading of a single bucket, which requires cutting out appropriate splits according to the max value and min value of files. The LSM supports range reading.)

More buckets:

  • Pros:  better scalable (more distributed parallelisms)
  • Cons: more operation and maintenance costs

The default value of the bucket is 1, so that it can be used out of the box on the local machine.

It can be set by:

Code Block
SET table-storage.bucket = 10;

If users want to change the bucket number, they need to delete the table and create a new table.


In the past, many users encountered the problem that the sink did not output because they did not open the checkpoint.

For the built-in dynamic table: The planner will throw an exception if the checkpoint is not turned on. (Later we can add public connector interface, including Filesystem, Hive, Iceberg, Hudi need it).

Concurrent Write

Only a single stream writer is allowed to write data to a Dynamic table. But re-processing is allowed, so while the stream job is running, there may be another job generating a snapshot.

Write contention uses a distributed optimistic lock mechanism, for Active partition: INSERT OVERWRITE and ALTER TABLE … COMPACT will delete files, It may conflict with the streaming job, which means that the command may fail and the user will be asked to retry. Optimism is reflected in the deletion of files. If the file to be deleted is found missing when committing, it will fail instead of locking at the beginning.

For HDFS, path renaming is used for concurrent write, if the renaming fails, it can know that the snapshotId was preempted by the another job, at which point it can recheck and generate a new snapshot.

But for object file system instead of HDFS, renaming is not work, we need catalog lock to solve commit conflicts:

Code Block
 * An interface that allows source and sink to use global lock to some transaction-related things.
public interface CatalogLock extends AutoCloseable {

    /** Run with catalog lock. The caller should tell catalog the database and table name. */
    <T> T runWithLock(String database, String table, Callable<T> callable) throws Exception;

    /** Factory to create {@link CatalogLock}. */
    interface Factory extends Serializable {
        CatalogLock create();

Currently, only HiveCatalog can provide this catalog lock.

And we need a interface to set lock to source&sink by catalog:




configure a table separately, users can also configure it through options without "table-storage." prefix, for example:

Code Block
CREATE TABLE T (...) WITH ('log.consistency'='eventual');

Table Options

In addition to session options that can be configured individually for each table by removing the prefix, there are also some options that can be configured individually only for tables, they are the options that affect reading and writing:






Specifies the scan startup mode for log consumer.

  • full: Performs a snapshot on the table upon first startup, and continue to read the latest changes. (Using HybridSource, the switching between snapshot and changes is exactly-once consistency because we store the offset of the corresponding log to snapshot when writing data)
  • latest: Start from the latest.
  • from-timestamp: Start from user-supplied timestamp.
log.scan.timestamp-mills(none)LongOptional timestamp used in case of "from-timestamp" scan mode.
change-trackingtrueBooleanIf users do not need to consume changes from the table, they can disable Change Tracking. This can reduce resource consumption.


The record is hashed into different buckets according to the primary key (if have) or the whole row (without primary key):

Bucket is for distributed reading and writing.

Bucket and Parallelism are corresponding:

  • writing: A single bucket can only be written by a single parallelism. But one parallelism can write to multiple buckets. So the max working parallelism of the sink will not be bigger than the bucket number.
  • reading: In general, a single bucket can only be read by a single parallelism. (Optimization: if the bucket is too large, we can consider supporting concurrent batch reading of a single bucket, which requires cutting out appropriate splits according to the max value and min value of files. The LSM supports range reading.)

More buckets:

  • Pros:  better scalable (more distributed parallelisms)
  • Cons: more operation and maintenance costs

The default value of the bucket is 1, so that it can be used out of the box on the local machine.

It can be set by:

Code Block
SET table-storage.bucket = 10;

If users want to change the bucket number, they need to delete the table and create a new table.


In the past, many users encountered the problem that the sink did not output because they did not open the checkpoint.

For the built-in dynamic table: The planner will throw an exception if the checkpoint is not turned on. (Later we can add public connector interface, including Filesystem, Hive, Iceberg, Hudi need it).

Concurrent Write

Only a single stream writer is allowed to write data to a Dynamic table. But re-processing is allowed, so while the stream job is running, there may be another job generating a snapshot.

Write contention uses a distributed optimistic lock mechanism, for Active partition: INSERT OVERWRITE and ALTER TABLE … COMPACT will delete files, It may conflict with the streaming job, which means that the command may fail and the user will be asked to retry. Optimism is reflected in the deletion of files. If the file to be deleted is found missing when committing, it will fail instead of locking at the beginning.

For HDFS, path renaming is used for concurrent write, if the renaming fails, it can know that the snapshotId was preempted by the another job, at which point it can recheck and generate a new snapshot.

But for object file system instead of HDFS, renaming is not work, we need catalog lock to solve commit conflicts:

Code Block
 * An interface that allows source and sink to use global lock to some transaction-related things.
public interface CatalogLock extends AutoCloseable {

    /** Run with catalog lock. The caller should tell catalog the database and table name. */
    <T> T runWithLock(String database, String table, Callable<T> callable) throws Exception;

    /** Factory to create {@link CatalogLock}. */
    interface Factory extends Serializable {
        CatalogLock create();

Currently, only HiveCatalog can provide this catalog lock.

And we need a interface to set lock to source&sink by catalog:

Code Block
 * Source and sink implement this interface if they require {@link CatalogLock}. This is marked as
 * internal. If we need lock to be more general, we can put lock factory into {@link
 * DynamicTableFactory.Context}.
public interface RequireCatalogLock {

    void setLockFactory(CatalogLock.Factory lockFactory);


Log Retention

The cost of log is generally large, so log can not save all the history of data, we provide parameters to configure the log retention time: "log.retention".

Thanks to the FileStore's data preservation, the expired data is still stored in the FileStore. By default (log.scan is full), user's stream consumption fetches all data.

So, users can set a smaller Log retention to reduce the cost in log system if the users don't need log.scan from-timestamp mode.

Data Retention

Data never expires automatically.

If there is a need for data retention, the user can choose one of the following options:

  • In the SQL for querying storage, users filters the data by themselves
  • Define the time partition, and users can delete the expired partition by themselves. (DROP PARTITION ...)
  • In the future version, we will support "DELETE FROM" statement, users can delete the expired data according to the conditions.

Interfaces for Table

A catalog that supports built-in dynamic table needs to implement the method in the Catalog (The GenericInMemoryCatalog and HiveCatalog will implement this method):


We need an interface to discover the managed table factory implementation when there is no "connector=..." optionfor managed table:

Code Block
 * Base interface for configuring a managed dynamic table connector. The managed table factory is
 * used when there is no {@link FactoryUtil#CONNECTOR} option.
public interface ManagedTableFactory extends DynamicTableFactory {

    default String factoryIdentifier() {
        return "";

     * Enrich options from catalog and session information.
     * @return new options of this table.
    Map<String, String> enrichOptions(Context context);

    /** Notifies the listener that a table creation occurred. */
    void onCreateTable(Context context);

    /** Notifies the listener that a table drop occurred. */
    void onDropTable(Context context);


  • Key: 
    • Without primary key: key is null.
    • With primary key: key is json format by default. This is controlled by 'log.pkkey.key-format'.
  • Value: Use debezium-json to storerecord with or without declaration primary key by default. This is controlled by 'log.format'.


  • Declaring the primary key in the table definition
  • '' = 'eventual'
  • '' = 'upsert' – this is the default mode for table with primary key

When using upsert mode, a normalized node is generated in downstream consuming job, which will generate update_before messages for old data based on the primary key, meaning that duplicate data will be corrected to an eventual consistent state.


If the user wants to see the all changes of this table or remove downstream normalized node, he/she can configure:  

  • 'log.


  • changelog-mode' = 'all'

This requires

  • '' = 'transactional'
  • The sink query produces changes with UPDATE_BEFORE, If not, we can:
    • Throws unsupported exception in the MVP
    • In future, we can automatically add the normalize node before sink to generate required UPDATE_BEFORE messages
