...
- It’s a built-in storage for Flink SQL
- Improve usability issues
- Flink DDL is no longer just a mapping, but a real creation for these tables
- Masks & abstracts the underlying technical details, no annoying options
- Supports subsecond streaming write & consumption
- It could be backed by a service-oriented message queue (Like Kafka)
- High throughput scan capability
- Filesystem with colunar formats would be an ideal choice just like iceberg/hudi does.
- More importantly, in order to solve the cognitive bar, storage needs to automatically address various Insert/Update/Delete inputs and table definitions
- Receive any type of changelog, receive any type of datatype
- Table can have primary key or no primary key
...
Compact table for high performance query. Launch a job to rewrite files. It is a synchronous operation.
READING
Code Block | ||||
---|---|---|---|---|
| ||||
-- unbounded streaming reading (Read changes) SET 'execution.runtime-mode' = 'streaming'; INSERT INTO ... SELECT ... FROM [catalog_name.][db_name.]table_name; -- bounded reading (Read a snapshot) SET 'execution.runtime-mode' = 'batch'; INSERT INTO ... SELECT ... FROM [catalog_name.][db_name.]table_name; |
...
Code Block | ||||
---|---|---|---|---|
| ||||
DESCRIBE DETAILTABLE TABLEEXTENDED [catalog_name.][db_name.]table_name [PARTITION partition_spec] |
...
name | type | description |
name | String | catalog.database.tableName |
log.kafka.bootstrap.servers | Map | Kafka brokers |
log.retention | Duration | how long changes log will be kept |
file.path | String | File path |
log.kafka.topic | String | topic of Kafka |
watermark | Timestamp | Watermark of the latest written data (If table defines watermark) |
file.format | String | format for file |
bucket | Integer | bucket number |
change-tracking | Boolean | does this table tracking changes |
DESCRIBE DETAIL TABLE EXTENDED … PARTITION output:
name | type | description |
partition | String | partition spec |
file.path | String | path of this partition |
last-modified | Timestamp | last modified time |
num-files | Integer | file number |
DESCRIBE DETAIL TABLE EXTENDED without partition definition output above columns too except partition.
...
We put necessary configurations in the global configuration to avoid the need for users to configure each individual table.
...
If users need to configure a table separately, users can also configure it through options.
Key | Default | Type | Description |
table-storage.log.system | kafka | String | Log system. |
table-storage.log.kafka.properties.bootstrap.servers | (none) | Map | Kafka brokers. eg: localhost:9092 |
table-storage.log.retention | (none) | Duration | It means how long changes log will be kept. The default value is from the log system cluster. |
table-storage.log.scan | full | String | Specifies the scan startup mode for log consumer.
|
table-storage.log.pk.consistency | transactional | String | Specifies the log consistency mode for table with primary key.
|
table-storage.log.pk.changelog-mode | upsert | String | Specifies the log changelog mode for table with primary key.
|
table-storage.log.pk.key-format | json | String | Specifies the key message format of log system with primary key. |
table-storage.log.format | debezium-json | String | Specifies the message format of log system. |
table-storage.file.root-path | (none) | String | Root file path. |
table-storage.file.format | parquet | String | Format name for file. |
table-storage.bucket | 1 | Integer | Bucket number for file and Partition number for Kafka. |
table-storage.change-tracking | true | Boolean | If users do not need to consume changes from the table, they can disable Change Tracking. This can reduce resource consumption. |
...
If users want to change the bucket number, they need to delete the table and create a new table.
Concurrent Write
Only a single stream writer is allowed to write data to a Dynamic table.
Write contention uses a distributed optimistic lock mechanism, for Active partition: INSERT OVERWRITE and ALTER TABLE … COMPACT will delete files, It may conflict with the streaming job, which means that the command may fail and the user will be asked to retry.
Checkpoint
Checkpoint
In the past, many users encountered the problem that the sink In the past, many users encountered the problem that the sink did not output because they did not open the checkpoint.
For the built-in dynamic table: The planner will throw an exception if the checkpoint is not turned on. (Later we can add public connector interface, including Filesystem, Hive, Iceberg, Hudi need it).
Interfaces for Table
A catalog that supports built-in dynamic table needs to implement GenericCatalog, Now, the GenericInMemoryCatalog and HiveCatalog will implement this interface:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* This generic catalog can store any object created by Flink DDL or Table API, and it is only
* responsible for storing their metadata. It acts as a database, holding relevant meta information.
*
* <p>This interface is distinguished from external Catalog. ExternalCatalog may have its own
* managed table, which may conflict with Flink managed table. When implementing this interface, if
* there is no specified connector, it should be interpreted as Flink managed table.
*/
@PublicEvolving
public interface GenericCatalog extends Catalog {} |
We need an interface to discover the managed table factory implementation when there is no "connector=..." option::
Concurrent Write
Only a single stream writer is allowed to write data to a Dynamic table. But re-processing is allowed, so while the stream job is running, there may be another job generating a snapshot.
Write contention uses a distributed optimistic lock mechanism, for Active partition: INSERT OVERWRITE and ALTER TABLE … COMPACT will delete files, It may conflict with the streaming job, which means that the command may fail and the user will be asked to retry. Optimism is reflected in the deletion of files. If the file to be deleted is found missing when committing, it will fail instead of locking at the beginning.
For HDFS, path renaming is used for concurrent write, if the renaming fails, it can know that the snapshotId was preempted by the another job, at which point it can recheck and generate a new snapshot.
But for object file system instead of HDFS, renaming is not work, we need catalog lock to solve commit conflicts:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* An interface that allows source and sink to use global lock to some transaction-related things.
*/
@Internal
public interface CatalogLock extends AutoCloseable {
/** Run with catalog lock. The caller should tell catalog the database and table name. */
<T> T runWithLock(String database, String table, Callable<T> callable) throws Exception;
/** Factory to create {@link CatalogLock}. */
interface Factory extends Serializable | ||||
Code Block | ||||
| ||||
/** * Base interface for configuring a managed dynamic table connector. The managed table factory is * used when there is no {@link FactoryUtil#CONNECTOR} option. */ @Internal public interface ManagedTableFactory extends DynamicTableFactory { @Override default String factoryIdentifier() { return ""CatalogLock create(); } } |
Currently, only HiveCatalog can provide this catalog lock.
And we need a interface to set lock to source&sink by catalog:
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Source /** * Enrich options from catalog and session information. * * @return new options of this table. */ Map<String, String> enrichOptions(Context context); /** Notifies the listener that a table creation occurred. */ void onCreateTable(Context context); /** Notifies the listener that a table drop occurred. */ void onDropTable(Context context); } |
Table API for creating managed table:
and sink implement this interface if they require {@link CatalogLock}. This is marked as
* internal. If we need lock to be more general, we can put lock factory into {@link
* DynamicTableFactory.Context}.
*/
@Internal
public interface RequireCatalogLock {
void setLockFactory(CatalogLock.Factory lockFactory);
} |
Interfaces for Table
A catalog that supports built-in dynamic table needs to implement the method in the Catalog (The GenericInMemoryCatalog and HiveCatalog will implement this method):
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* If return true, the Table without specified connector will be translated to the Flink managed table.
* See {@link CatalogBaseTable.TableKind#MANAGED}
*/
default boolean supportsManagedTable {
return false;
} |
We need an interface to discover the managed table factory implementation when there is no "connector=..." option:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Base interface for configuring a managed dynamic table connector. The managed table factory is
* used when there is no {@link FactoryUtil#CONNECTOR} option.
*/
@Internal
public interface ManagedTableFactory extends DynamicTableFactory {
@Override
default String factoryIdentifier | ||||
Code Block | ||||
| ||||
@PublicEvolving public class TableDescriptor { /** Creates a new {@link Builder} for a managed dynamic table. */ public static Builder forManaged() { return new Builder()""; } ... } |
For object file system instead of HDFS, we need catalog lock to solve commit conflicts:
Code Block | ||||
---|---|---|---|---|
| ||||
/** * An interface that allows* sourceEnrich andoptions sinkfrom tocatalog useand global lock to some transaction-related thingssession information. */ @Internal public interface CatalogLock extends AutoCloseable { /** Run with catalog lock. The caller should tell catalog the database and table name. */ * * @return new options of this table. <T> T runWithLock(String database, String table, Callable<T> callable) throws Exception*/ Map<String, String> enrichOptions(Context context); /** Factory to create {@link CatalogLock}. */ Notifies the listener that a table creation occurred. */ void onCreateTable(Context context); /** interfaceNotifies Factorythe extendslistener Serializablethat { a table drop occurred. */ CatalogLockvoid createonDropTable(Context context); } } |
Currently, only HiveCatalog can provide this catalog lock.
} |
Table API for creating managed tableAnd we need a interface to set lock to source&sink by catalog:
Code Block | ||||
---|---|---|---|---|
| ||||
/**@PublicEvolving public *class SourceTableDescriptor and{ sink implement this interface/** ifCreates theya requirenew {@link CatalogLockBuilder}. for Thisa ismanaged markeddynamic as table. */ internal. If we needpublic lockstatic toBuilder be more general, we can put lock factory into {@link * DynamicTableFactory.Context}. */ @Internal public interface RequireCatalogLock {forManaged() { return new Builder(); } void setLockFactory(CatalogLock.Factory lockFactory);... } |
Proposed Design
Conceptually, Flink built-in tables consist of two parts, LogStore and FileStore. The LogStore would serve the need of message systems, while FileStore will play the role of file systems with columnar formats. At each point in time, LogStore and FileStore will store exactly the same data for the latest written data (LogStore has TTL), but with different physical layouts. If one remembers the concept of dynamic table in Flink SQL, this is exactly what we want to build here.
...
- Key:
- Without primary key: key is null.
- With primary key: key is json format. This is controlled by 'log.pk.key-format'.
- Value: Use debezium-json to store value record with or without declaration primary key. This is controlled by 'log.format'.
Consistency & Visibility
By default, data is only visible after the checkpoint, which means that the logStore has transactional consistency.
...