Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • It’s a built-in storage for Flink SQL
    • Improve usability issues
    • Flink DDL is no longer just a mapping, but a real creation for these tables
    • Masks & abstracts the underlying technical details, no annoying options
  • Supports subsecond streaming write & consumption
    • It could be backed by a service-oriented message queue (Like Kafka)
  • High throughput scan capability
    • Filesystem with colunar formats would be an ideal choice just like iceberg/hudi does.
  • More importantly, in order to solve the cognitive bar, storage needs to automatically address various Insert/Update/Delete inputs and table definitions
    • Receive any type of changelog, receive any type of datatype
    • Table can have primary key or no primary key

...

Compact table for high performance query. Launch a job to rewrite files. It is a synchronous operation.

READING

Code Block
languagesql
titleSQL
-- unbounded streaming reading (Read changes)
SET 'execution.runtime-mode' = 'streaming';
INSERT INTO ... SELECT ... FROM [catalog_name.][db_name.]table_name;

-- bounded reading (Read a snapshot)
SET 'execution.runtime-mode' = 'batch';
INSERT INTO ... SELECT ... FROM [catalog_name.][db_name.]table_name;

...

Code Block
languagesql
titleSQL
DESCRIBE DETAILTABLE TABLEEXTENDED [catalog_name.][db_name.]table_name [PARTITION partition_spec]

...

name

type

description

name

String

catalog.database.tableName

log.kafka.bootstrap.servers

Map

Kafka brokers

log.retention

Duration

how long changes log will be kept

file.path

String

File path

log.kafka.topic

String

topic of Kafka

watermark

Timestamp

Watermark of the latest written data (If table defines watermark)

file.format

String

format for file

bucket

Integer

bucket number

change-tracking

Boolean

does this table tracking changes


DESCRIBE DETAIL TABLE EXTENDED … PARTITION output:

name

type

description

partition

String

partition spec

file.path

String

path of this partition

last-modified

Timestamp

last modified time

num-files

Integer

file number


DESCRIBE DETAIL TABLE EXTENDED without partition definition output above columns too except partition.

...

We put necessary configurations in the global configuration to avoid the need for users to configure each individual table.

...

If users need to configure a table separately, users can also configure it through options.

Key

Default

Type

Description

table-storage.log.systemkafkaStringLog system.

table-storage.log.kafka.properties.bootstrap.servers

(none)

Map

Kafka brokers. eg: localhost:9092

table-storage.log.retention

(none)

Duration

It means how long changes log will be kept. The default value is from the log system cluster.

table-storage.log.scanfullString

Specifies the scan startup mode for log consumer.

  • full: Performs a snapshot on the table upon first startup, and continue to read the latest changes. (Using HybridSource, the switching between snapshot and changes is exactly-once consistency because we store the offset of the corresponding log to snapshot when writing data)
  • latest: Start from the latest.
  • from-timestamp: Start from user-supplied timestamp.
table-storage.log.pk.consistencytransactionalString

Specifies the log consistency mode for table with primary key.

  • transactional: only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval
  • eventual: Immediate data visibility, you may see some intermediate states, but eventually the right results will be produced
table-storage.log.pk.changelog-modeupsertString

Specifies the log changelog mode for table with primary key.

  • upsert: the log system does not store the UPDATE_BEFORE changes, the log consumed job will automatically add the normalized node, relying on the state to generate the required update_before.
  • all: the log system stores all changes including UPDATE_BEFORE
table-storage.log.pk.key-formatjsonStringSpecifies the key message format of log system with primary key.
table-storage.log.formatdebezium-jsonStringSpecifies the message format of log system.

table-storage.file.root-path

(none)

String

Root file path.

table-storage.file.format

parquet

String

Format name for file.

table-storage.bucket

1

Integer

Bucket number for file and Partition number for Kafka.

table-storage.change-trackingtrueBooleanIf users do not need to consume changes from the table, they can disable Change Tracking. This can reduce resource consumption.

...

If users want to change the bucket number, they need to delete the table and create a new table.

Concurrent Write

Only a single stream writer is allowed to write data to a Dynamic table.

Write contention uses a distributed optimistic lock mechanism, for Active partition: INSERT OVERWRITE and ALTER TABLE … COMPACT will delete files, It may conflict with the streaming job, which means that the command may fail and the user will be asked to retry.

Checkpoint

Checkpoint

In the past, many users encountered the problem that the sink In the past, many users encountered the problem that the sink did not output because they did not open the checkpoint.

For the built-in dynamic table: The planner will throw an exception if the checkpoint is not turned on. (Later we can add public connector interface, including Filesystem, Hive, Iceberg, Hudi need it).

Interfaces for Table

A catalog that supports built-in dynamic table needs to implement GenericCatalog, Now, the GenericInMemoryCatalog and HiveCatalog will implement this interface:

Code Block
languagejava
titleInterface
/**
 * This generic catalog can store any object created by Flink DDL or Table API, and it is only
 * responsible for storing their metadata. It acts as a database, holding relevant meta information.
 *
 * <p>This interface is distinguished from external Catalog. ExternalCatalog may have its own
 * managed table, which may conflict with Flink managed table. When implementing this interface, if
 * there is no specified connector, it should be interpreted as Flink managed table.
 */
@PublicEvolving
public interface GenericCatalog extends Catalog {}

We need an interface to discover the managed table factory implementation when there is no "connector=..." option::

Concurrent Write

Only a single stream writer is allowed to write data to a Dynamic table. But re-processing is allowed, so while the stream job is running, there may be another job generating a snapshot.

Write contention uses a distributed optimistic lock mechanism, for Active partition: INSERT OVERWRITE and ALTER TABLE … COMPACT will delete files, It may conflict with the streaming job, which means that the command may fail and the user will be asked to retry. Optimism is reflected in the deletion of files. If the file to be deleted is found missing when committing, it will fail instead of locking at the beginning.

For HDFS, path renaming is used for concurrent write, if the renaming fails, it can know that the snapshotId was preempted by the another job, at which point it can recheck and generate a new snapshot.

But for object file system instead of HDFS, renaming is not work, we need catalog lock to solve commit conflicts:

Code Block
languagejava
titleInterface
/**
 * An interface that allows source and sink to use global lock to some transaction-related things.
 */
@Internal
public interface CatalogLock extends AutoCloseable {

    /** Run with catalog lock. The caller should tell catalog the database and table name. */
    <T> T runWithLock(String database, String table, Callable<T> callable) throws Exception;

    /** Factory to create {@link CatalogLock}. */
    interface Factory extends Serializable
Code Block
languagejava
titleInterface
/**
 * Base interface for configuring a managed dynamic table connector. The managed table factory is
 * used when there is no {@link FactoryUtil#CONNECTOR} option.
 */
@Internal
public interface ManagedTableFactory extends DynamicTableFactory {

    @Override
    default String factoryIdentifier() {
        return ""CatalogLock create();
    }
}

Currently, only HiveCatalog can provide this catalog lock.

And we need a interface to set lock to source&sink by catalog:

Code Block
languagejava
titleInterface
/**
 * Source  /**
     * Enrich options from catalog and session information.
     *
     * @return new options of this table.
     */
    Map<String, String> enrichOptions(Context context);

    /** Notifies the listener that a table creation occurred. */
    void onCreateTable(Context context);

    /** Notifies the listener that a table drop occurred. */
    void onDropTable(Context context);
}

Table API for creating managed table:

and sink implement this interface if they require {@link CatalogLock}. This is marked as
 * internal. If we need lock to be more general, we can put lock factory into {@link
 * DynamicTableFactory.Context}.
 */
@Internal
public interface RequireCatalogLock {

    void setLockFactory(CatalogLock.Factory lockFactory);
}

Interfaces for Table

A catalog that supports built-in dynamic table needs to implement the method in the Catalog (The GenericInMemoryCatalog and HiveCatalog will implement this method):

Code Block
languagejava
titleInterface
/**
 * If return true, the Table without specified connector will be translated to the Flink managed table.
 * See {@link CatalogBaseTable.TableKind#MANAGED}
 */
default boolean supportsManagedTable {
    return false;
}


We need an interface to discover the managed table factory implementation when there is no "connector=..." option:

Code Block
languagejava
titleInterface
/**
 * Base interface for configuring a managed dynamic table connector. The managed table factory is
 * used when there is no {@link FactoryUtil#CONNECTOR} option.
 */
@Internal
public interface ManagedTableFactory extends DynamicTableFactory {

    @Override
    default String factoryIdentifier
Code Block
languagejava
titleInterface
@PublicEvolving
public class TableDescriptor {

    /** Creates a new {@link Builder} for a managed dynamic table. */
    public static Builder forManaged() {
        return new Builder()"";
    }

    ...
}

For object file system instead of HDFS, we need catalog lock to solve commit conflicts:

Code Block
languagejava
titleInterface
/**
 * An interface that allows* sourceEnrich andoptions sinkfrom tocatalog useand global lock to some transaction-related thingssession information.
 */
@Internal
public interface CatalogLock extends AutoCloseable {

    /** Run with catalog lock. The caller should tell catalog the database and table name. */    *
     * @return new options of this table.
    <T> T runWithLock(String database, String table, Callable<T> callable) throws Exception*/
    Map<String, String> enrichOptions(Context context);

    /** Factory to create {@link CatalogLock}. */ Notifies the listener that a table creation occurred. */
    void onCreateTable(Context context);

    /** interfaceNotifies Factorythe extendslistener Serializablethat {
a table drop occurred. */
    CatalogLockvoid createonDropTable(Context context);
    }
}

Currently, only HiveCatalog can provide this catalog lock.


}


Table API for creating managed tableAnd we need a interface to set lock to source&sink by catalog:

Code Block
languagejava
titleInterface
/**@PublicEvolving
public *class SourceTableDescriptor and{

 sink implement this interface/** ifCreates theya requirenew {@link CatalogLockBuilder}. for Thisa ismanaged markeddynamic as
table. */
 internal. If we needpublic lockstatic toBuilder be more general, we can put lock factory into {@link
 * DynamicTableFactory.Context}.
 */
@Internal
public interface RequireCatalogLock {forManaged() {
        return new Builder();
    }

    void setLockFactory(CatalogLock.Factory lockFactory);...
}

Proposed Design

Conceptually, Flink built-in tables consist of two parts, LogStore and FileStore. The LogStore would serve the need of message systems, while FileStore will play the role of file systems with columnar formats. At each point in time, LogStore and FileStore will store exactly the same data for the latest written data (LogStore has TTL), but with different physical layouts. If one remembers the concept of dynamic table in Flink SQL, this is exactly what we want to build here. 

...

  • Key: 
    • Without primary key: key is null.
    • With primary key: key is json format. This is controlled by 'log.pk.key-format'.
  • Value: Use debezium-json to store value record with or without declaration primary key. This is controlled by 'log.format'.

Consistency & Visibility

By default, data is only visible after the checkpoint, which means that the logStore has transactional consistency.

...