Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

JIRA: <TODO>

...

Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25152

Release1.15


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • It’s a built-in storage for Flink SQL
    • Improve usability issues
    • Flink DDL is no longer just a mapping, but a real creation for these tables
    • Masks & abstracts the underlying technical details, no annoying options
  • Supports subsecond streaming write & consumption
    • It could be backed by a service-oriented message queue (Like Kafka)
  • High throughput scan capability
    • Filesystem with colunar formats would be an ideal choice just like iceberg/hudi does.
  • More importantly, in order to solve the cognitive bar, storage needs to automatically address various Insert/Update/Delete inputs and table definitions
    • Receive any type of changelog, receive any type of datatype
    • Table can have primary key or no primary key

...

When creating a table, the corresponding underlying physical storage will be created. Very simple, it masks & abstracts the underlying technical details, no annoying options.

Limitation: When a partitioned table has a primary key, the primary key must contain the partitioned fields inside.

DROP

Code Block
languagesql
titleSQL
DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

...

Compact table for high performance query. Launch a job to rewrite files. It is a synchronous operation.

READING

Code Block
languagesql
titleSQL
-- unbounded streaming reading (Read changes)
SET 'execution.runtime-mode' = 'streaming';
INSERT INTO ... SELECT ... FROM [catalog_name.][db_name.]table_name;

-- bounded reading (Read a snapshot)
SET 'execution.runtime-mode' = 'batch';
INSERT INTO ... SELECT ... FROM [catalog_name.][db_name.]table_name;

...

Code Block
languagesql
titleSQL
DESCRIBE DETAILTABLE TABLEEXTENDED [catalog_name.][db_name.]table_name [PARTITION partition_spec]

DESCRIBE DETAIL TABLE EXTENDED output:

name

type

description

name

String

catalog.database.tableName

log.systemStringthe log system

log.kafka.bootstrap.servers

Map

Kafka brokers

log.retention

Duration

how long changes log will be kept

file.path

String

File path

log.kafka.topic

String

topic of Kafka

watermark

Timestamp

Watermark of the latest written data (If table defines watermark)

file.format

String

format for file

bucket

Integer

bucket number

change-tracking

Boolean

does this table tracking changes


DESCRIBE DETAIL TABLE EXTENDED … PARTITION output:

name

type

description

partition

String

partition spec

file.path

String

path of this partition

last-modified

Timestamp

last modified time

num-files

Integer

file number

DESCRIBE DETAIL TABLE EXTENDED without partition definition output above columns too except partition.

Configuration

Session Options

In every table environment, the TableConfig offers `TableEnvironment.getConfig` offers options for configuring the current session.

We put necessary configurations in the global session configuration to avoid the need for users to configure each individual table.

...

table

...

.

Key

Default

Type

Description

table-storage.log.systemkafkaStringLog system. Now only Kafka in the MVP.

table-storage.log.kafka.properties.bootstrap.servers

(none)

MapString

Kafka brokers. eg: localhost:9092

table-storage.log.retention

(none)

Duration

It means how long changes log will be kept. The default value is from the log system cluster.

table-storage.log.scanconsistencyfulltransactionalString

Specifies the scan startup log consistency mode for log consumertable.

  • transactional: only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval
  • eventual: Immediate data visibility, you may see some intermediate states, but eventually the right results will be produced, only works in table with primary key
  • full: Performs a snapshot on the table upon first startup, and continue to read the latest changes. (Using HybridSource, the switching between snapshot and changes is exactly-once consistency because we store the offset of the corresponding log to snapshot when writing data)
  • latest: Start from the latest.
  • from-timestamp: Start from user-supplied timestamp.
table-storage.log.pk.consistencychangelog-modeautotransactionalString

Specifies the log consistency mode changelog mode for table.

Specifies the log changelog mode for table with primary key.

  • auto: upsert for table with primary key, all for table without primary key.
  • transactionalupsert: only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval
  • eventual: Immediate data visibility, you may see some intermediate states, but eventually the right results will be produced
table-storage.log.pk.changelog-modeupsertString
  • upsert: the log system does not store the UPDATE_BEFORE changes, the log consumed job will automatically add the normalized node, relying on the state to generate the the log system does not store the UPDATE_BEFORE changes, the log consumed job will automatically add the normalized node, relying on the state to generate the required update_before.
  • all: the log system stores all changes including UPDATE_BEFORE
table-storage.log.pkkey.key-formatjsonStringSpecifies the key message format of log system with primary key.
table-storage.log.formatdebezium-jsonStringSpecifies the message format of log system.

table-storage.file.root-path

(none)

String

Root file path.

table-storage.file.format

parquet

String

Format name for file.

table-storage.bucket

1

Integer

Bucket number for file and Partition number for Kafka.

table-storage.change-trackingtrueBooleanIf users do not need to consume changes from the table, they can disable Change Tracking. This can reduce resource consumption.

Bucket

The record is hashed into different buckets according to the primary key (if have) or the whole row (without primary key):

Bucket is for distributed reading and writing.

Bucket and Parallelism are corresponding:

  • writing: A single bucket can only be written by a single parallelism. But one parallelism can write to multiple buckets. So the max working parallelism of the sink will not be bigger than the bucket number.
  • reading: In general, a single bucket can only be read by a single parallelism. (Optimization: if the bucket is too large, we can consider supporting concurrent batch reading of a single bucket, which requires cutting out appropriate splits according to the max value and min value of files. The LSM supports range reading.)

More buckets:

  • Pros:  better scalable (more distributed parallelisms)
  • Cons: more operation and maintenance costs

The default value of the bucket is 1, so that it can be used out of the box on the local machine.

It can be set by:

Code Block
languagesql
titleSQL
SET table-storage.bucket = 10;
CREATE TABLE ...

If users want to change the bucket number, they need to delete the table and create a new table.

Concurrent Write

Only a single stream writer is allowed to write data to a Dynamic table.

Write contention uses a distributed optimistic lock mechanism, for Active partition: INSERT OVERWRITE and ALTER TABLE … COMPACT will delete files, It may conflict with the streaming job, which means that the command may fail and the user will be asked to retry.

Checkpoint

In the past, many users encountered the problem that the sink did not output because they did not open the checkpoint.

For the built-in dynamic table: The planner will throw an exception if the checkpoint is not turned on. (Later we can add public connector interface, including Filesystem, Hive, Iceberg, Hudi need it).

Interfaces for Table

A catalog that supports built-in dynamic table needs to implement GenericCatalog, Now, the GenericInMemoryCatalog and HiveCatalog will implement this interface:

Code Block
languagejava
titleInterface
/**
 * This generic catalog can store any object created by Flink DDL or Table API, and it is only
 * responsible for storing their metadata. It acts as a database, holding relevant meta information.
 *
 * <p>This interface is distinguished from external Catalog. ExternalCatalog may have its own
 * managed table, which may conflict with Flink managed table. When implementing this interface, if
 * there is no specified connector, it should be interpreted as Flink managed table.
 */
@PublicEvolving
public interface GenericCatalog extends Catalog {}

We need an interface to discover the managed table factory implementation when there is no "connector=..." option::

Code Block
languagejava
titleInterface
/**
 * Base interface for configuring a managed dynamic table connector. The managed table factory is
 * used when there is no {@link FactoryUtil#CONNECTOR} option.
 */
@Internal
public interface ManagedTableFactory extends DynamicTableFactory {

    @Override
    default String factoryIdentifier() {
        return "";
    }

    /**
     * Enrich options from catalog and session information.
     *
     * @return new options of this table.
     */
    Map<String, String> enrichOptions(Context context);

    /** Notifies the listener that a table creation occurred. */
    void onCreateTable(Context context);

    /** Notifies the listener that a table drop occurred. */
    void onDropTable(Context context);
}

If users need to configure a table separately, users can also configure it through options without "table-storage." prefix, for example:

Code Block
languagesql
titleSQL
CREATE TABLE T (...) WITH ('log.consistency'='eventual');

Table Options

In addition to session options that can be configured individually for each table by removing the prefix, there are also some options that can be configured individually only for tables, they are the options that affect reading and writing:

Key

Default

Type

Description

log.scanfullString

Specifies the scan startup mode for log consumer.

  • full: Performs a snapshot on the table upon first startup, and continue to read the latest changes. (Using HybridSource, the switching between snapshot and changes is exactly-once consistency because we store the offset of the corresponding log to snapshot when writing data)
  • latest: Start from the latest.
  • from-timestamp: Start from user-supplied timestamp.
log.scan.timestamp-mills(none)LongOptional timestamp used in case of "from-timestamp" scan mode.
change-trackingtrueBooleanIf users do not need to consume changes from the table, they can disable Change Tracking. This can reduce resource consumption.

Bucket

The record is hashed into different buckets according to the primary key (if have) or the whole row (without primary key):

Bucket is for distributed reading and writing.

Bucket and Parallelism are corresponding:

  • writing: A single bucket can only be written by a single parallelism. But one parallelism can write to multiple buckets. So the max working parallelism of the sink will not be bigger than the bucket number.
  • reading: In general, a single bucket can only be read by a single parallelism. (Optimization: if the bucket is too large, we can consider supporting concurrent batch reading of a single bucket, which requires cutting out appropriate splits according to the max value and min value of files. The LSM supports range reading.)

More buckets:

  • Pros:  better scalable (more distributed parallelisms)
  • Cons: more operation and maintenance costs

The default value of the bucket is 1, so that it can be used out of the box on the local machine.

It can be set by:

Code Block
languagesql
titleSQL
SET table-storage.bucket = 10;
CREATE TABLE ...

If users want to change the bucket number, they need to delete the table and create a new table.

Checkpoint

In the past, many users encountered the problem that the sink did not output because they did not open the checkpoint.

For the built-in dynamic table: The planner will throw an exception if the checkpoint is not turned on. (Later we can add public connector interface, including Filesystem, Hive, Iceberg, Hudi need it).

Concurrent Write

Only a single stream writer is allowed to write data to a Dynamic table. But re-processing is allowed, so while the stream job is running, there may be another job generating a snapshot.

Write contention uses a distributed optimistic lock mechanism, for Active partition: INSERT OVERWRITE and ALTER TABLE … COMPACT will delete files, It may conflict with the streaming job, which means that the command may fail and the user will be asked to retry. Optimism is reflected in the deletion of files. If the file to be deleted is found missing when committing, it will fail instead of locking at the beginning.

For HDFS, path renaming is used for concurrent write, if the renaming fails, it can know that the snapshotId was preempted by the another job, at which point it can recheck and generate a new snapshot.

But for object file system instead of HDFS, renaming is not work, we need catalog lock to solve commit conflicts:

Code Block
languagejava
titleInterface
/**
 * An interface that allows source and sink to use global lock to some transaction-related things.
 */
@Internal
public interface CatalogLock extends AutoCloseable {

    /** Run with catalog lock. The caller should tell catalog the database and table name. */
    <T> T runWithLock(String database, String table, Callable<T> callable) throws Exception;

    /** Factory to create {@link CatalogLock}. */
    interface Factory extends Serializable {
        CatalogLock create();
    }
}

Currently, only HiveCatalog can provide this catalog lock.

And we need a interface to set lock to source&sink by catalog:

Code Block
languagejava
titleInterface
/**
 * Source and sink implement this interface if they require {@link CatalogLock}. This is marked as
 * internal. If we need lock to be more general, we can put lock factory into {@link
 * DynamicTableFactory.Context}.
 */
@Internal
public interface RequireCatalogLock {

    void setLockFactory(CatalogLock.Factory lockFactory);
}

Retention

Log Retention

The cost of log is generally large, so log can not save all the history of data, we provide parameters to configure the log retention time: "log.retention".

Thanks to the FileStore's data preservation, the expired data is still stored in the FileStore. By default (log.scan is full), user's stream consumption fetches all data.

So, users can set a smaller Log retention to reduce the cost in log system if the users don't need log.scan from-timestamp mode.

Data Retention

Data never expires automatically.

If there is a need for data retention, the user can choose one of the following options:

  • In the SQL for querying storage, users filters the data by themselves
  • Define the time partition, and users can delete the expired partition by themselves. (DROP PARTITION ...)
  • In the future version, we will support "DELETE FROM" statement, users can delete the expired data according to the conditions.

Interfaces for Table

A catalog that supports built-in dynamic table needs to implement the method in the Catalog (The GenericInMemoryCatalog and HiveCatalog will implement this method):

Code Block
languagejava
titleInterface
/**
 * If return true, the Table without specified connector will be translated to the Flink managed table.
 * See {@link CatalogBaseTable.TableKind#MANAGED}
 */
default boolean supportsManagedTable {
    return false;
}


We need an interface to discover the managed table factory implementation for Table API for creating managed table:

Code Block
languagejava
titleInterface
@PublicEvolving
public class TableDescriptor {

    /**
 Creates* a new {@link Builder}Base interface for configuring a managed dynamic table connector. */
The managed table  public static Builder forManaged() {factory is
 * used when there is no  return new Builder();
    }

    ...
}

For object file system instead of HDFS, we need catalog lock to solve commit conflicts:

Code Block
languagejava
titleInterface
/**
 * An interface that allows source and sink to use global lock to some transaction-related things.
 */
@Internal
public interface CatalogLock extends AutoCloseable {{@link FactoryUtil#CONNECTOR} option.
 */
@Internal
public interface ManagedTableFactory extends DynamicTableFactory {

    @Override
    default String factoryIdentifier() {
        return "";
    }

    /** Run with
     * Enrich options from catalog lock. The caller should tell catalog the database and table name. */and session information.
     *
     * @return new options of this table.
    <T> T runWithLock(String database, String table, Callable<T> callable) throws Exception*/
    Map<String, String> enrichOptions(Context context);

    /** Factory to create {@link CatalogLock}Notifies the listener that a table creation occurred. */
    void onCreateTable(Context context);

    /** interfaceNotifies Factorythe extendslistener Serializablethat {
a table drop occurred. */
    CatalogLockvoid createonDropTable(Context context);
    }
}

Currently, only HiveCatalog can provide this catalog lock.


Table API for creating managed tableAnd we need a interface to set lock to source&sink by catalog:

Code Block
languagejava
titleInterface
/**@PublicEvolving
public *class SourceTableDescriptor and{

 sink implement this interface/** ifCreates theya requirenew {@link CatalogLockBuilder}. for Thisa ismanaged markeddynamic as
table. */
 internal. If we needpublic lockstatic toBuilder be more general, we can put lock factory into {@link
 * DynamicTableFactory.Context}.
 */
@Internal
public interface RequireCatalogLock {forManaged() {
        return new Builder();
    }

    void setLockFactory(CatalogLock.Factory lockFactory);...
}

Proposed Design

Conceptually, Flink built-in tables consist of two parts, LogStore and FileStore. The LogStore would serve the need of message systems, while FileStore will play the role of file systems with columnar formats. At each point in time, LogStore and FileStore will store exactly the same data for the latest written data (LogStore has TTL), but with different physical layouts. If one remembers the concept of dynamic table in Flink SQL, this is exactly what we want to build here. 

...

  • Key: 
    • Without primary key: key is null.
    • With primary key: key is json format by default. This is controlled by 'log.pkkey.key-format'.
  • Value: Use debezium-json to store value record with or without declaration primary key by default. This is controlled by 'log.format'.

Consistency & Visibility

By default, data is only visible after the checkpoint, which means that the logStore has transactional consistency.

...

  • Declaring the primary key in the table definition
  • 'log.pk.consistency' = 'eventual'
  • 'log.pk.changelog-mode' = 'upsert' – this is the default modemode for table with primary key

When using upsert mode, a normalized node is generated in downstream consuming job, which will generate update_before messages for old data based on the primary key, meaning that duplicate data will be corrected to an eventual consistent state.

Changelog mode

By default, for the table with primary key, the records in the log system only contains INSERT, UPDATE_AFTER, DELETE. No UPDATE_BEFORE. A normalized node is generated in downstream consuming job, the node will store all key-value for producing UPDATE_BEFORE message.

If the user wants to see the all changes of this table or remove downstream normalized node, he/she can configure:  'log.pk.changelog-mode' = 'all'

This requires

an eventual consistent state.

Changelog mode

By default, for the table with primary key, the records in the log system only contains INSERT, UPDATE_AFTER, DELETE. No UPDATE_BEFORE. A normalized node is generated in downstream consuming job, the node will store all key-value for producing UPDATE_BEFORE message.

If the user wants to see the all changes of this table or remove downstream normalized node, he/she can configure:

  • 'log.changelog-mode' = 'all'

This requires

  • 'log.consistency' = 'transactional'
  • The sink query produces changes with UPDATE_BEFORE, If not, we can:
    • Throws unsupported exception in the MVP
    • In future, we can automatically add the normalize node before sink to generate required UPDATE_BEFORE messages

Optimize Upsert mode

Many users complain about upsert-kafka, where the normalized nodes in downstream consumption jobs generate a lot of state and risk state expiration.

Unlike upsert-kafka, the upsert mode preserves the complete delete message and avoids normalization for the following downstream operators:

  • Upsert sink: Upsert sink only requires upsert inputs without UPDATE_BEFORE.
  • Join: Join for unique inputs will store records by unique key. It can work without  UPDATE_BEFORE.
  • 'log.pk.consistency' = 'transactional'
  • The sink query produces changes with UPDATE_BEFORE, If not, we can:
  • Throws unsupported exception in the MVP
  • In future, we can automatically add the normalize node before sink to generate required UPDATE_BEFORE messages

FileStore

Overview

As a storage system supporting real-time ad-hoc analysis:

...