Status
...
Page properties | |
---|---|
|
...
JIRA: <TODO>
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Table of Contents |
---|
Motivation
Streaming analytics is one the most important use cases among all SQL usage. Users are relying on the streaming nature of Flink to deliver subsecond end to end latency. The typical architecture for such use case is like following:
...
- It’s hard to choose the most suited external systems when the requirements include streaming pipelines, offline batch jobs, ad-hoc queries and even some point lookup queries.
- Even if you already make your choice, it will definitely increase the operation and maintenance complexity. Users at least need to coordinate between the log system and file system of each table, which is error prone.
Proposal
If you have experience with Flink SQL, you might still be familiar with SQL’s basic concept: dynamic table. In short, a dynamic table is a logical concept which has two different physical representations: changelog and table. Right now, by relying on SQL connectors, users can define table which acts like one of the physical representations, but not both.
...
- It’s a built-in storage for Flink SQL
- Improve usability issues
- Flink DDL is no longer just a mapping, but a real creation for these tables
- Masks & abstracts the underlying technical details, no annoying options
- Supports subsecond streaming write & consumption
- It could be backed by a service-oriented message queue (Like Kafka)
- High throughput scan capability
- Filesystem with colunar formats would be an ideal choice just like iceberg/hudi does.
- More importantly, in order to solve the cognitive bar, storage needs to automatically address various Insert/Update/Delete inputs and table definitions
- Receive any type of changelog, receive any type of datatype
- Table can have primary key or no primary key
Public Interfaces
Example
If we have a built-in Flink Dynamic Table, users just focus on their business logic:
Code Block | ||||
---|---|---|---|---|
| ||||
-- Just business fields, primary key is not mandatory CREATE TABLE intermediate_table ( order_id BIGINT, auction_id BIGINT, category_id BIGINT, trans_amount BIGINT, create_time TIMESTAMP, dt STRING ) PARTITIONED BY (dt); -- Insert into INSERT INTO intermediate_table SELECT A.order_id, A.auction_id, B.category_id, A.trans_amount, A.create_time, DATE_FORMAT(create_time, 'yyyy-MM-dd') FROM orders A LEFT JOIN category_dim B ON A.auction_id = B.auction_id; -- Query: Streaming Pipeline INSERT INTO ... SELECT ... FROM intermediate_table; -- Query: Batch ad-hoc query SELECT * FROM intermediate_table WHERE ...; |
SQL Statements
CREATE
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
[WITH ('change-tracking' = 'false')]
<physical_column_definition>:
column_name column_type [ <column_constraint> ] [COMMENT column_comment]
<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression |
When creating a table, the corresponding underlying physical storage will be created. Very simple, it masks & abstracts the underlying technical details, no annoying options.
Limitation: When a partitioned table has a primary key, the primary key must contain the partitioned fields inside.
DROP
Code Block | ||||
---|---|---|---|---|
| ||||
DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name |
When dropping a table, the corresponding underlying physical storage will be deleted.
(If the user does not drop the table through Flink, the physical storage under the table may not be deleted all the time. The user can record the response address through describe and handle it by himself)
COMPACT
Code Block | ||||
---|---|---|---|---|
| ||||
ALTER TABLE [catalog_name.][db_name.]table_name [PARTITION partition_spec] COMPACT |
Compact table for high performance query. Launch a job to rewrite files. It is a synchronous operation.
...
READING
Code Block | ||||
---|---|---|---|---|
| ||||
ALTER TABLE [catalog_name.-- unbounded streaming reading (Read changes) SET 'execution.runtime-mode' = 'streaming'; INSERT INTO ... SELECT ... FROM [catalog_name.][db_name.]table_name [PARTITION partition_spec] SET 'change-tracking; -- bounded reading (Read a snapshot) SET 'execution.runtime-mode' = 'false' |
Table
If users do not need to consume changes from the table, they can disable Change Tracking. This can reduce resource consumption.
Partition
Turn off the change tracking of a specific partition, which means that the written record will no longer be output changes, and the downstream stream consumption will not see the changes.
The changes by Batch jobs will be tracked by default. But sometimes, like in the revision of the old partition of the whole pipeline, the state of the downstream stream job may have expired long ago. What we need is the batch pipeline.
In this case, we need to close the changes tracking of this partition, batch job will not produce changes to downstream stream jobs.
batch';
INSERT INTO ... SELECT ... FROM [catalog_name.][db_name.]table_name; |
The table supports both stream reading (read changes) and high-performance batch reading.
INSERT
...
Code Block | ||||
---|---|---|---|---|
| ||||
-- unbounded streaminginsert, readingnot (Readsupport changes) SET 'execution.runtime-mode' = 'streaming'; OVERWRITE INSERT INTO ... SELECT ... FROM [catalog_name.][db_name.]table_name; -- bounded reading (Read a snapshot) SET 'execution.runtime-mode' = 'batch'; INSERT INTO ... SELECT ... FROM [PARTITION part_spec] [column_list] select_statement; -- bounded insert INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name; |
The table supports both stream reading (read changes) and high-performance batch reading.
INSERT
Code Block | ||||
---|---|---|---|---|
| ||||
-- unbounded insert, not support OVERWRITE INSERT INTO [catalog_name.][db_name.]table_name [PARTITION part_spec] [column_list] select_statement; -- bounded insert INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] [column_list] select_statement; part_spec: (part_col_name1=val1 [, part_spec: (part_col_name1=val1 [, part_col_name2=val2, ...]) column_list: (col_name1 [, column_name2, ...]) |
Users can write any type of changelog with any SQL.
DESCRIBE
The changes by Batch jobs will be tracked by default. But sometimes, like in the revision of the old partition of the whole pipeline, the state of the downstream stream job may have expired long ago. What we need is the batch pipeline.
In this case, we need to close the changes tracking of this writing, batch job will not produce changes to downstream stream jobs. (re-processing)
Code Block | ||||
---|---|---|---|---|
| ||||
INSERT INTO | ||||
Code Block | ||||
| ||||
DESCRIBE DETAIL TABLE [catalog_name.][db_name.]table_name /*+ OPTIONS('change-tracking' = 'false') */ ... |
DESCRIBE
Code Block | ||||
---|---|---|---|---|
| ||||
DESCRIBE TABLE EXTENDED [catalog_name.][db_name.]table_name [[PARTITION partition_spec] |
DESCRIBE DETAIL TABLE EXTENDED output:
name | type | description |
name | String | catalog.database.tableName |
log.system | String | the log system |
log.kafka.bootstrap.servers | Map | Kafka brokers |
log.retention | Duration | how long changes log will be kept |
file.path | String | File path |
log.kafka.topic | String | topic of Kafka |
watermark
Timestamp
Watermark of the latest written data (If table defines watermark)
file.format
String
format for file
bucket
Integer
bucket number
file.format | String | format for file |
bucket | Integer | bucket number |
change-tracking | Boolean | does this table tracking changes |
DESCRIBE TABLE EXTENDED DESCRIBE DETAIL … PARTITION output:
name | type | description |
partition | String | partition spec |
file.path | String | path of |
last-modified
Timestamp
last modified time
compacted
Boolean
is compacted
change-tracking
Boolean
this partition |
num-files | Integer | file number |
DESCRIBE DETAIL TABLE EXTENDED without partition definition output above columns too except partition.
Configuration
Session Options
In every table environment, the TableConfig offers `TableEnvironment.getConfig` offers options for configuring the current session.
We put necessary configurations in the global session configuration to avoid the need for users to configure each individual table.
(none)
Map
Key | Default | Type | Description |
table-storage.log. |
system | kafka | String | Log system. Now only Kafka in the MVP. |
table-storage.log.kafka.bootstrap.servers | (none) | String |
Kafka brokers. eg: localhost:9092 | |||
table-storage.log.retention | (none) | Duration | It means how long changes log will be kept. The default value is from the |
log system cluster. |
table-storage. |
log. |
(none)
String
Root file path.
table-storage.file.format
parquet
String
Format name for file.
table-storage.bucket
1
Integer
Bucket number for file and Partition number for Kafka.
Bucket
The record is hashed into different buckets according to the primary key (if have) or the whole row (without primary key):
Bucket is for distributed reading and writing.
Bucket and Parallelism are corresponding:
- writing: A single bucket can only be written by a single parallelism. But one parallelism can write to multiple buckets. So the max working parallelism of the sink will not be bigger than the bucket number.
- reading: In general, a single bucket can only be read by a single parallelism. (Optimization: if the bucket is too large, we can consider supporting concurrent batch reading of a single bucket, which requires cutting out appropriate splits according to the max value and min value of files. The LSM supports range reading.)
More buckets:
- Pros: better scalable (more distributed parallelisms)
- Cons: more operation and maintenance costs
The default value of the bucket is 1, so that it can be used out of the box on the local machine.
It can be set by:
Code Block | ||||
---|---|---|---|---|
| ||||
SET table-storage.bucket = 10;
CREATE TABLE ... |
If users want to change the bucket number, they need to delete the table and create a new table.
Concurrent Write
Only a single stream writer is allowed to write data to a Dynamic table.
Write contention uses a distributed optimistic lock mechanism, for Active partition: INSERT OVERWRITE and ALTER TABLE … COMPACT will delete files, It may conflict with the streaming job, which means that the command may fail and the user will be asked to retry.
Consistency & Visibility
...
The visibility of reading:
- Batch reading: The data is visible only after the checkpoint, the latency depends on the checkpoint interval.
- Stream reading:
- Without Primary key: The data is visible only after the checkpoint, the latency depends on the checkpoint interval.
- With Primary key: The data is immediately visible
- When the flow job fails, duplicate changes may occur, but the downstream Flink stream job will be deduplicated according to the primary key to achieve the eventual consistency.
(see below analysis in Design chapter)
Checkpoint
In the past, many users encountered the problem that the sink did not output because they did not open the checkpoint.
In order to improve the ease of use of the built-in dynamic table: Set checkpoint interval to 1 min if checkpoint is not enabled when the planner detects a sink to built-in dynamic table.
Proposed Design
Conceptually, Flink built-in tables consist of two parts, LogStore and FileStore. The LogStore would serve the need of message systems, while FileStore will play the role of file systems with columnar formats. At each point in time, LogStore and FileStore will store exactly the same data for the latest written data (LogStore has TTL), but with different physical layouts. If one remembers the concept of dynamic table in Flink SQL, this is exactly what we want to build here.
The LogStore data has faster Time-To-Live, and FileStore ensures that historical data can be queried:
- LogStore: Store the latest data, support second level streaming incremental consumption, rely on Kafka
- For full support of Insert/Update/Delete and optional primary key definition, auto select Kafka with Debezium format or Upsert-Kafka
- FileStore: Store latest data + historical data, provide batch Ad-Hoc analysis
- For full support of Insert/Update/Delete and optional primary key definition, we need a flexible storage structure that supports updates and custom merges, a LSM is a good choice
- To support high performance analysis, should be columnar file format
...
LogStore
Log storage relies on Kafka. We use Kafka with Debezium-Avro and Upsert-Kafka as underlying storages.
- DDL with Primary Key: Use Upsert-Kafka + Avro format
- DDL without Primary Key: Use Kafka with Debezium-Avro format
Bucket in LogStore is Kafka Partition, which means the record is hashed into different Kafka partitions according to the primary key (if have) or the whole row (without primary key).
FileStore
Overview
As a storage system supporting real-time ad-hoc analysis:
- LSM with Columnar format
- Fast update and data skipping
- High compression ratio and high performance analysis
- Partition and Bucket
- data warehouse support
- Consistency
- file management
- version control
The directory structure of FileStore on DFS is as follows:
Data directory description:
- Part Directory: partition directory, defined by "PARTITIONED BY" in DDL, represents a partition with the same directory name as Hive, such as "dt=2020-08-08"
- Bucket Directory: the bucket under the partition. The data falls to a bucket through hash. The bucket is an LSM composed of multiple files
- LSM datafiles: data file, abstract format, supporting orc, parquet and Avro. The record schema of data file:
- SequenceNumber
- ValueKind(add or delete)
- RowData: key
- RowData: value
Meta file description:
- Manifest file: represents how many files have been added and how many files have been deleted. It represents a change to the table. Manifest represents the incremental files of a version. The record schema of manifest file is DataFile:
- data file name
- FileKind: add or delete
- partition
- bucket
- min/max key: for file skipping
- min/max sequence number
- statistics: data file size, row count
- Snapshot file: a collection of manifest files that represents a snapshot of a table. Snapshot represents all files of a version. The record schema of snapshot file is ManifestFile:
- manifest file name
- lower/upper partition: for partition pruning
- statistics: manifest file size, addedFileCount, deleteFileCount
Write Process
- LSM Process (Similar to Leveldb):
- Memtable is maintained in memory. Data is directly written to memtable. Each data has a sequence number. For the same key, data with large sequence will overwrite data with small sequence
- When the memtable is full or PrepareCommit, flush the memtable, sort the memtable by key + sequence number, merge the duplicate keys, and write the data to the remote file using a specific format
- The asynchronous thread performs LSM compactions
- Prepare Commit
- Flush MemTable
- Commit message is: DeleteFiles and AddFiles.
- Global Commit
- Get old Snapshots, if this checkpoint has been committed, just return
- Read the previous snapshot-${i}, write the deleteFiles and addFiles of buckets to the new manifest, and generate a new snapshot-${i+1}
Compaction
Auto compaction is in the streaming sink (writer).
We do not have independent services to compact. Independent services will bring a lot of additional design complexity, and we only need a decoupled storage in the current version. (In future, if we have a service, we can let the streaming writer be the pure append writer.)
For each LSM, there is only one streaming writer, and this writer also needs to be responsible for its compaction.
About compaction strategy, at present, we don't have enough tests to adjust the compaction strategy. We can refer to the two mainstream strategies of rocksdb:
Leveled Compaction:
The trigger of compaction depends on three options:
- level0_file_num_compaction_trigger: When the number of level0 files exceeds this value, compact level0 files to level1
- max_bytes_for_level_base and max_bytes_for_level_multiplier: If the base is 1GB, multiplier is 5, then if the data of level1 exceeds 1GB, the compaction will be performed, and if the data of level2 exceeds 5GB, the compaction will be performed...
For Leveled Compaction, every level is a sort Run (except level0). SSTs in level1 will merge with level2, and finally form orderly SSTs in Level2, and each SST will not overlap. Leveled Compaction is the default strategy of RocksDb:
- Write amplify: bad, the data will be compacted once and once
- Read and Space amplify: good, every level no overlap
Universal Compaction:
- universal_sort_run_num_compaction_trigger: When the number of sort run exceeds this value, do compaction
- universal_max_size_amplification_percent
- universal_size_ratio
In universal mode, there are many sort runs. For R1, R2, R3,..., Rn, each R is a sort run, R1 contains the latest data, and Rn contains the oldest data. When the preconditions are met, the following compaction is triggered in priority order:
- Compaction by Space Amplification, will do full compaction to compact all sort runs. Amplification is:
- size(R1) + size(R2) + … + size(Rn-1) / size(Rn) (If the frequency of delete is similar to the frequency of insertion)
- Compaction by Individual Size Ratio: If the previous size (R1) is less than the size (R2) in a certain proportion, the default is 1%, then perform a compaction with R1 and R2. If (R1 + R2) * (100 + ratio)% 100 < R3, add R3 to the compaction.
- Compaction by Sort Run Number: If none of the above is triggered, the first few are forced to be compacted
Compared to Leveled Compaction, Universal compaction:
- Write amplify: good, old data will not be compacted once and once
- Read and Space amplify: bad, a larger number of Sort Runs
In our scenario, writing is more. Although leveled compaction may have a better compression rate in the aggregation scenario, in the first version, we first provide universal compaction.
Query Process
- Planner
- Read the current snapshot, prune partitions according to filtering conditions, and obtain manifests to be read
- Merge the deleteFiles and addFiles in manifests to generate a file list for each bucket in each partition
- SplitEnumerator
- Traverse the partitions to be read and generate the corresponding SourceSplit for each bucket
- Filter the files to be read in the bucket according to filtering conditions, produce the files of each LSM level in SourceSplit
- Runtime Task
- Obtain the SourceSplit to be read, generate the MergeIterator of LSM, and read the data
Support Changelog
Similarly, we should shield the complexity of changelog support just like LogStore. Changelog is supported as follows:
- DDL with Primary Key
- LSM Key: Primary Key
- LSM Value: Row (All columns)
- DDL without Primary Key
- LSM Key: Row (All columns)
- LSM Value: Count, Number of occurrences of the same record
- Count is +1 when adding and Count is -1 when deleting. Sum count when compaction and merge reading.
- DDL with Index Key: When there is no primary key, users can define an index key to speed up update and query. (Not in this FLIP)
Query Pushdown
FileStore can support more compaction strategies, help the input data to achieve the effect of lazy computation. (Not in this FLIP) For example:
- SUM Compaction: Non-key fields will be grouped by to sum aggregation.
- COALESCE Compaction: just store non-null fields, It can replace streaming join to widen the fields
Visibility Analysis
(See above ‘Consistency & Visibility’ in public interfaces)
FileStore Visibility
Compared with LogStore, the visibility of FileStore is less important. More importantly, it can store a large amount of queryable data.
Only files submitted to DFS through checkpoint can be seen by readers, the latency depends on checkpoint interval.
LogStore Visibility
LogStore latency is very important and requires high visibility. But on the other hand, we also need to ensure Exactly-once consistency.
So a table without Primary Key, in combination with Kafka's transaction mechanism, similarly, only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval.
But with Primary Key, can we do more? LogStore uses Upsert-Kafka as the underlying implementation.
When reading an Upsert-Kafka, the downstream job will generate a special node after the source: The node normalizes an UPSERT stream containing duplicate events. This node normalizes such a stream into a regular changelog stream that contains INSERT / UPDATE_BEFORE / UPDATE_AFTER / DELETE records without duplication.
If we close the Kafka transaction, the upstream producer generates duplicate changes. This means that there may be some duplication of data. However, if there is a normalized node, even if there is duplicate data, the corresponding update will be generated to UPDATE_BEFORE / UPDATE_AFTER records, duplicate data will be retracted, which means that the eventual consistency can be achieved after recovery.
We can close the Kafka transaction when there is a primary key to achieve a sub second delay, even if there is a large checkpoint interval.
Analysis
Let's enumerate the various visibility:
1.FileStore immediately visible?
- No way now, it will generate a large number of small files
2.FileStore & LogStore checkpoint visible:
- Pros: In any case, it is Exactly-once
- Cons: latency is very high. Reach the minute level at present.
3.FileStore checkpoint visible, LogStore immediately visible
- Cons: The semantics generated by FileStore and LogStore are inconsistent, and the results generated by users in the two ways are inconsistent, which we should avoid
4.FileStore checkpoint visible, LogStore without PK checkpoint visible, LogStore with PK immediately visible, the downstream Flink stream job will be deduplicated according to the primary key
- Pros:
- Streaming pipeline can achieve sub second delay with Primary key
- In any case, it is Exactly-once (Consumers will deduplicate records according to PK). LogStore is a message system, if there is primary key definition, with updates, the message system can not guarantee that there is no duplication, because it has no ability to remove duplication
In the MVP version, we only provide #4 way by default.
Internal Interfaces
All the following interfaces are marked as internal, because there is only one implementation at present and it is not intended to be open to external users.
We need an interface to discover the default factory implementation when there is no "connector=..." option:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Base interface for configuring a default dynamic table connector. The default table factory is
* used when there is no {@link FactoryUtil#CONNECTOR} option.
*/
@Internal
public interface DefaultDynamicTableFactory extends DynamicTableFactory {
@Override
default String factoryIdentifier() {
return "_default";
}
} |
We need an interface to discover the default log factory implementation, discover Kafka log factory:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Base interface for configuring a default log table connector. The log table is used by {@link
* DefaultDynamicTableFactory}.
*
* <p>Log tables are for processing only unbounded data. Support streaming reading and streaming
* writing.
*/
@Internal
public interface DefaultLogTableFactory extends DynamicTableFactory {
@Override
default String factoryIdentifier() {
return "_default_log";
}
} |
As mentioned above, we need to minimize the user's options, so we need to assign the options of the table according to the configuration. Moreover, we need to create the underlying physical storage when creating tables, such as Kafka topic:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* An object that can receive a notification when a table is created. Notification occurs before
* {@link Catalog#createTable} is called.
*
* <p>Only {@link DefaultDynamicTableFactory} and {@link DefaultLogTableFactory} support
* notification.
*/
@Internal
public interface CreateTableListener {
/**
* Notifies the listener that a table creation occurred.
*
* @return new options of this table.
*/
Map<String, String> onTableCreation(TableNotification context);
} |
We need to delete the underlying physical storage when dropping tables, such as Kafka topic:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* An object that can receive a notification when a table is dropped. Notification occurs before
* {@link Catalog#dropTable} is called.
*
* <p>Only {@link DefaultDynamicTableFactory} and {@link DefaultLogTableFactory} support
* notification.
*/
@Internal
public interface DropTableListener {
/** Notifies the listener that a table drop occurred. */
void onTableDrop(TableNotification context);
} |
The notification of CreateTable and DropTable Listener
...
language | java |
---|---|
title | Interface |
...
consistency | transactional | String | Specifies the log consistency mode for table.
|
table-storage.log.changelog-mode | auto | String | Specifies the log changelog mode for table.
|
table-storage.log.key.format | json | String | Specifies the key message format of log system with primary key. |
table-storage.log.format | debezium-json | String | Specifies the message format of log system. |
table-storage.file.path | (none) | String | Root file path. |
table-storage.file.format | parquet | String | Format name for file. |
table-storage.bucket | 1 | Integer | Bucket number for file and Partition number for Kafka. |
If users need to configure a table separately, users can also configure it through options without "table-storage." prefix, for example:
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE T (...) WITH ('log.consistency'='eventual'); |
Table Options
In addition to session options that can be configured individually for each table by removing the prefix, there are also some options that can be configured individually only for tables, they are the options that affect reading and writing:
Key | Default | Type | Description |
log.scan | full | String | Specifies the scan startup mode for log consumer.
|
log.scan.timestamp-mills | (none) | Long | Optional timestamp used in case of "from-timestamp" scan mode. |
change-tracking | true | Boolean | If users do not need to consume changes from the table, they can disable Change Tracking. This can reduce resource consumption. |
Bucket
The record is hashed into different buckets according to the primary key (if have) or the whole row (without primary key):
Bucket is for distributed reading and writing.
Bucket and Parallelism are corresponding:
- writing: A single bucket can only be written by a single parallelism. But one parallelism can write to multiple buckets. So the max working parallelism of the sink will not be bigger than the bucket number.
- reading: In general, a single bucket can only be read by a single parallelism. (Optimization: if the bucket is too large, we can consider supporting concurrent batch reading of a single bucket, which requires cutting out appropriate splits according to the max value and min value of files. The LSM supports range reading.)
More buckets:
- Pros: better scalable (more distributed parallelisms)
- Cons: more operation and maintenance costs
The default value of the bucket is 1, so that it can be used out of the box on the local machine.
It can be set by:
Code Block | ||||
---|---|---|---|---|
| ||||
SET table-storage.bucket = 10;
CREATE TABLE ... |
If users want to change the bucket number, they need to delete the table and create a new table.
Checkpoint
In the past, many users encountered the problem that the sink did not output because they did not open the checkpoint.
For the built-in dynamic table: The planner will throw an exception if the checkpoint is not turned on. (Later we can add public connector interface, including Filesystem, Hive, Iceberg, Hudi need it).
Concurrent Write
Only a single stream writer is allowed to write data to a Dynamic table. But re-processing is allowed, so while the stream job is running, there may be another job generating a snapshot.
Write contention uses a distributed optimistic lock mechanism, for Active partition: INSERT OVERWRITE and ALTER TABLE … COMPACT will delete files, It may conflict with the streaming job, which means that the command may fail and the user will be asked to retry. Optimism is reflected in the deletion of files. If the file to be deleted is found missing when committing, it will fail instead of locking at the beginning.
For HDFS, path renaming is used for concurrent write, if the renaming fails, it can know that the snapshotId was preempted by the another job, at which point it can recheck and generate a new snapshot.
But for object file system instead of HDFS, renaming is not work, we need catalog lock to solve commit conflicts:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* An interface that allows source and sink to use global lock to some transaction-related things.
*/
@Internal
public interface CatalogLock extends AutoCloseable {
/** Run with catalog lock. The caller should tell catalog the database and table name. */
<T> T runWithLock(String database, String table, Callable<T> callable) throws Exception;
/** Factory to create {@link CatalogLock}. */
interface Factory extends Serializable {
CatalogLock create();
}
} |
Currently, only HiveCatalog can provide this catalog lock.
And we need a interface to set lock to source&sink by catalog:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Source and sink implement this interface if they require {@link CatalogLock}. This is marked as
* internal. If we need lock to be more general, we can put lock factory into {@link
* DynamicTableFactory.Context}.
*/
@Internal
public interface RequireCatalogLock {
void setLockFactory(CatalogLock.Factory lockFactory);
} |
Retention
Log Retention
The cost of log is generally large, so log can not save all the history of data, we provide parameters to configure the log retention time: "log.retention".
Thanks to the FileStore's data preservation, the expired data is still stored in the FileStore. By default (log.scan is full), user's stream consumption fetches all data.
So, users can set a smaller Log retention to reduce the cost in log system if the users don't need log.scan from-timestamp mode.
Data Retention
Data never expires automatically.
If there is a need for data retention, the user can choose one of the following options:
- In the SQL for querying storage, users filters the data by themselves
- Define the time partition, and users can delete the expired partition by themselves. (DROP PARTITION ...)
- In the future version, we will support "DELETE FROM" statement, users can delete the expired data according to the conditions.
Interfaces for Table
A catalog that supports built-in dynamic table needs to implement the method in the Catalog (The GenericInMemoryCatalog and HiveCatalog will implement this method):
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* If return true, the Table without specified connector will be translated to the Flink managed table.
* See {@link CatalogBaseTable.TableKind#MANAGED}
*/
default boolean supportsManagedTable {
return false;
} |
We need an interface to discover the managed table factory implementation for managed table:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Base interface for configuring a managed dynamic table connector. The managed table factory is
* used when there is no {@link FactoryUtil#CONNECTOR} option.
*/
@Internal
public interface ManagedTableFactory extends DynamicTableFactory {
@Override
default String factoryIdentifier() {
return "";
}
/**
* Enrich options from catalog and session information.
*
* @return new options of this table.
*/
Map<String, String> enrichOptions(Context context);
/** Notifies the listener that a table creation occurred. */
void onCreateTable(Context context);
/** Notifies the listener that a table drop occurred. */
void onDropTable(Context context);
} |
Table API for creating managed table:
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving
public class TableDescriptor {
/** Creates a new {@link Builder} for a managed dynamic table. */
public static Builder forManaged() {
return new Builder();
}
...
} |
Proposed Design
Conceptually, Flink built-in tables consist of two parts, LogStore and FileStore. The LogStore would serve the need of message systems, while FileStore will play the role of file systems with columnar formats. At each point in time, LogStore and FileStore will store exactly the same data for the latest written data (LogStore has TTL), but with different physical layouts. If one remembers the concept of dynamic table in Flink SQL, this is exactly what we want to build here.
The LogStore data has faster Time-To-Live, and FileStore ensures that historical data can be queried:
- LogStore: Store the latest data, support second level streaming incremental consumption, rely on Kafka
- For full support of Insert/Update/Delete and optional primary key definition
- FileStore: Store latest data + historical data, provide batch Ad-Hoc analysis
- For full support of Insert/Update/Delete and optional primary key definition, we need a flexible storage structure that supports updates and custom merges, a LSM is a good choice
- To support high performance analysis, should be columnar file format
LogStore
Log storage relies on log system. Default we use Kafka as underlying storages.
Bucket in LogStore is Kafka Partition, which means the record is hashed into different Kafka partitions according to the primary key (if have) or the whole row (without primary key).
Format
LogStore uses the open format to store record. The user can get record from the log store in a non-Flink way. By default:
- Key:
- Without primary key: key is null.
- With primary key: key is json format by default. This is controlled by 'log.key.format'.
- Value: Use debezium-json to storerecord with or without declaration primary key by default. This is controlled by 'log.format'.
Consistency & Visibility
By default, data is only visible after the checkpoint, which means that the logStore has transactional consistency.
If the user wants the data to be immediately visible, he/she needs to:
- Declaring the primary key in the table definition
- 'log.consistency' = 'eventual'
- 'log.changelog-mode' = 'upsert' – this is the default mode for table with primary key
When using upsert mode, a normalized node is generated in downstream consuming job, which will generate update_before messages for old data based on the primary key, meaning that duplicate data will be corrected to an eventual consistent state.
Changelog mode
By default, for the table with primary key, the records in the log system only contains INSERT, UPDATE_AFTER, DELETE. No UPDATE_BEFORE. A normalized node is generated in downstream consuming job, the node will store all key-value for producing UPDATE_BEFORE message.
If the user wants to see the all changes of this table or remove downstream normalized node, he/she can configure:
- 'log.changelog-mode' = 'all'
This requires
- 'log.consistency' = 'transactional'
- The sink query produces changes with UPDATE_BEFORE, If not, we can:
- Throws unsupported exception in the MVP
- In future, we can automatically add the normalize node before sink to generate required UPDATE_BEFORE messages
Optimize Upsert mode
Many users complain about upsert-kafka, where the normalized nodes in downstream consumption jobs generate a lot of state and risk state expiration.
Unlike upsert-kafka, the upsert mode preserves the complete delete message and avoids normalization for the following downstream operators:
- Upsert sink: Upsert sink only requires upsert inputs without UPDATE_BEFORE.
- Join: Join for unique inputs will store records by unique key. It can work without UPDATE_BEFORE.
FileStore
Overview
As a storage system supporting real-time ad-hoc analysis:
- LSM with Columnar format
- Fast update and data skipping
- High compression ratio and high performance analysis
- Partition and Bucket
- data warehouse support
- Consistency
- file management
- version control
The directory structure of FileStore on DFS is as follows:
Data directory description:
- Part Directory: partition directory, defined by "PARTITIONED BY" in DDL, represents a partition with the same directory name as Hive, such as "dt=2020-08-08"
- Bucket Directory: the bucket under the partition. The data falls to a bucket through hash. The bucket is an LSM composed of multiple files
- LSM datafiles: data file, abstract format, supporting orc, parquet and Avro. The record schema of data file:
- SequenceNumber
- ValueKind(add or delete)
- RowData: key
- RowData: value
Meta file description:
- Manifest file: represents how many files have been added and how many files have been deleted. It represents a change to the table. Manifest represents the incremental files of a version. The record schema of manifest file is DataFile:
- data file name
- FileKind: add or delete
- partition
- bucket
- min/max key: for file skipping
- min/max sequence number
- statistics: data file size, row count
- Snapshot file: a collection of manifest files that represents a snapshot of a table. Snapshot represents all files of a version. The record schema of snapshot file is ManifestFile:
- manifest file name
- lower/upper partition: for partition pruning
- statistics: manifest file size, addedFileCount, deleteFileCount
Write Process
- LSM Process (Similar to Leveldb):
- Memtable is maintained in memory. Data is directly written to memtable. Each data has a sequence number. For the same key, data with large sequence will overwrite data with small sequence
- When the memtable is full or PrepareCommit, flush the memtable, sort the memtable by key + sequence number, merge the duplicate keys, and write the data to the remote file using a specific format
- The asynchronous thread performs LSM compactions
- Prepare Commit
- Flush MemTable
- Commit message is: DeleteFiles and AddFiles.
- Global Commit
- Get old Snapshots, if this checkpoint has been committed, just return
- Read the previous snapshot-${i}, write the deleteFiles and addFiles of buckets to the new manifest, and generate a new snapshot-${i+1}
Compaction
Auto compaction is in the streaming sink (writer).
We do not have independent services to compact. Independent services will bring a lot of additional design complexity, and we only need a decoupled storage in the current version. (In future, if we have a service, we can let the streaming writer be the pure append writer.)
For each LSM, there is only one streaming writer, and this writer also needs to be responsible for its compaction.
About compaction strategy, at present, we don't have enough tests to adjust the compaction strategy. We can refer to the two mainstream strategies of rocksdb:
Leveled Compaction:
The trigger of compaction depends on three options:
- level0_file_num_compaction_trigger: When the number of level0 files exceeds this value, compact level0 files to level1
- max_bytes_for_level_base and max_bytes_for_level_multiplier: If the base is 1GB, multiplier is 5, then if the data of level1 exceeds 1GB, the compaction will be performed, and if the data of level2 exceeds 5GB, the compaction will be performed...
For Leveled Compaction, every level is a sort Run (except level0). SSTs in level1 will merge with level2, and finally form orderly SSTs in Level2, and each SST will not overlap. Leveled Compaction is the default strategy of RocksDb:
- Write amplify: bad, the data will be compacted once and once
- Read and Space amplify: good, every level no overlap
Universal Compaction:
- universal_sort_run_num_compaction_trigger: When the number of sort run exceeds this value, do compaction
- universal_max_size_amplification_percent
- universal_size_ratio
In universal mode, there are many sort runs. For R1, R2, R3,..., Rn, each R is a sort run, R1 contains the latest data, and Rn contains the oldest data. When the preconditions are met, the following compaction is triggered in priority order:
- Compaction by Space Amplification, will do full compaction to compact all sort runs. Amplification is:
- size(R1) + size(R2) + … + size(Rn-1) / size(Rn) (If the frequency of delete is similar to the frequency of insertion)
- Compaction by Individual Size Ratio: If the previous size (R1) is less than the size (R2) in a certain proportion, the default is 1%, then perform a compaction with R1 and R2. If (R1 + R2) * (100 + ratio)% 100 < R3, add R3 to the compaction.
- Compaction by Sort Run Number: If none of the above is triggered, the first few are forced to be compacted
Compared to Leveled Compaction, Universal compaction:
- Write amplify: good, old data will not be compacted once and once
- Read and Space amplify: bad, a larger number of Sort Runs
In our scenario, writing is more. Although leveled compaction may have a better compression rate in the aggregation scenario, in the first version, we first provide universal compaction.
Query Process
- Planner
- Read the current snapshot, prune partitions according to filtering conditions, and obtain manifests to be read
- Merge the deleteFiles and addFiles in manifests to generate a file list for each bucket in each partition
- SplitEnumerator
- Traverse the partitions to be read and generate the corresponding SourceSplit for each bucket
- Filter the files to be read in the bucket according to filtering conditions, produce the files of each LSM level in SourceSplit
- Runtime Task
- Obtain the SourceSplit to be read, generate the MergeIterator of LSM, and read the data
Support Changelog
Similarly, we should shield the complexity of changelog support just like LogStore. Changelog is supported as follows:
- DDL with Primary Key
- LSM Key: Primary Key
- LSM Value: Row (All columns)
- DDL without Primary Key
- LSM Key: Row (All columns)
- LSM Value: Count, Number of occurrences of the same record
- Count is +1 when adding and Count is -1 when deleting. Sum count when compaction and merge reading.
- DDL with Index Key: When there is no primary key, users can define an index key to speed up update and query. (Not in this FLIP)
Query Pushdown
FileStore can support more compaction strategies, help the input data to achieve the effect of lazy computation. (Not in this FLIP) For example:
- SUM Compaction: Non-key fields will be grouped by to sum aggregation.
- COALESCE Compaction: just store non-null fields, It can replace streaming join to widen the fields
Rejected Alternatives
Using Hudi
...
- Hudi aims to support the update of upsert, so it needs to forcibly define the primary key and time column. It is not easy to support all changelog types
- The update of Hudi is based on the index (currently there are BloomFilter and HBase). The data in the bucket is out of order. Every merge needs to be reread and rewritten, which is expensive. We need fast update storage, LSM is more suitable.
Add Primary Key
The Flink Built-in Dynamic Table supports free switching with and without primary key:
...
The cost of LogStore is too high, users can't continue their streaming consumption, and can only start consumption from the latest. Therefore, it is not supported at present.
Change Buckets
Code Block | ||||
---|---|---|---|---|
| ||||
ALTER TABLE [catalog_name.][db_name.]table_name [PARTITION partition_spec] CLUSTERED INTO num_buckets BUCKETS; |
Change Bucket number for FileStore tradeoff between scalable and performance. Launch a job to rewrite files. (The first version is not available. Users need to delete table and create a new table)
Implementation Plan
- POC branch: https://github.com/JingsongLi/flink/tree/storage_formal table_storage
- Implement in dev branch, the code will not enter the master branch for the time being
- Implement FileStore
- Abstract Format: support ORC and Parquet
- Implement LSM: MemStore and Compaction
- Implement Snapshot and Manifest: Version control
- Implement LogStore
- Auto create Kafka Topic
- Integrate CDC Format and Upsert Kafka
- Integrate Flink
- TableFactory: DynamicSource and DynamicSink
- Integrate to Catalog
- Extended DMLs