Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When dropping a table, the corresponding underlying physical storage will be deleted.

COMPACT

Code Block
languagesql
titleSQL
ALTER TABLE [catalog_name.][db_name.]table_name [PARTITION partition_spec] COMPACT

...

Compact table for high performance query. Launch a job to rewrite files.

CHANGE TRACKING

Code Block
languagesql
titleSQL
ALTER TABLE [catalog_name.][db_name.]table_name [PARTITION partition_spec] SET 'change-tracking' = 'false'

...

Table

If users do not need to consume changes from the table, they can disable Change Tracking. This can reduce resource consumption.

...

In this case, we need to close the changes tracking of this partition, batch job will not produce changes to downstream stream jobs.

READING

Code Block
languagesql
titleSQL
-- unbounded streaming reading (Read changes)

...


SET 'execution.runtime-mode' = 'streaming';

...


INSERT INTO ... SELECT ... FROM [catalog_name.][db_name.]table_name;

...



-- bounded reading (Read a snapshot)

...


SET 'execution.runtime-mode' = 'batch';

...


INSERT INTO ... SELECT ... FROM [catalog_name.][db_name.]table_name;

The table supports both stream reading (read changes) and high-performance batch reading.

INSERT

...

Code Block
languagesql
titleSQL
-- unbounded insert, not support OVERWRITE

...


INSERT INTO [catalog_name.][db_name.]table_name

...


  [PARTITION part_spec] [column_list] select_statement;

...



-- bounded insert

...


INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name

...


  [PARTITION part_spec] [column_list] select_statement;

...



part_spec:

...


  (part_col_name1=val1 [, part_col_name2=val2, ...])

...



column_list:

...


  (col_name1 [, column_name2, ...])

Users can write any type of changelog with any SQL.

DESCRIBE

Code Block
languagesql
titleSQL
DESCRIBE DETAIL TABLE [catalog_name.][db_name.]table_name [PARTITION partition_spec]

...

DESCRIBE DETAIL TABLE output:

...

The default value of the bucket is 1, so that it can be used out of the box on the local machine.

It can be set by:SET

Code Block
languagesql
titleSQL
SET table-storage.bucket = 10;

...


CREATE TABLE ...

If users want to change the bucket number, they need to delete the table and create a new table.

...

  • LogStore: Store the latest data, support second level streaming incremental consumption, rely on Kafka
    • For full support of Insert/Update/Delete and optional primary key definition, auto select Kafka with Debezium format or Upsert-Kafka
  • FileStore: Store latest data + historical data, provide batch Ad-Hoc analysis
    • For full support of Insert/Update/Delete and optional primary key definition, we need a flexible storage structure that supports updates and custom merges, a LSM is a good choice
    • To support high performance analysis, should be columnar file format

Image Added

LogStore

Log storage relies on Kafka. We use Kafka with Debezium-Avro and Upsert-Kafka as underlying storages.

...

As a storage system supporting real-time ad-hoc analysis:

  • LSM with Columnar format
  • Fast update and data skipping
  • High compression ratio and high performance analysis
  • Partition and Bucket
  • data warehouse support
  • Consistency
  • file management
  • version control

The directory structure of FileStore on DFS is as follows:

Image Added

Data directory description:

...

  1. LSM Process (Similar to Leveldb):
    1. Memtable is maintained in memory. Data is directly written to memtable. Each data has a sequence number. For the same key, data with large sequence will overwrite data with small sequence
    2. When the memtable is full or PrepareCommit, flush the memtable, sort the memtable by key + sequence number, merge the duplicate keys, and write the data to the remote file using a specific format
    3. The asynchronous thread performs LSM compactions
  2. Prepare Commit
    1. Flush MemTable
    2. Commit message is: DeleteFiles and AddFiles.
  3. Global Commit
    1. Get old Snapshots, if this checkpoint has been committed, just return
    2. Read the previous snapshot-${i}, write the deleteFiles and addFiles of buckets to the new manifest, and generate a new snapshot-${i+1}

Compaction

Auto compaction is in the streaming sink (writer).

...

In our scenario, writing is more. Although leveled compaction may have a better compression rate in the aggregation scenario, in the first version, we first provide universal compaction.

Query Process

  1. Planner
    1. Read the current snapshot, prune partitions according to filtering conditions, and obtain manifests to be read
    2. Merge the deleteFiles and addFiles in manifests to generate a file list for each bucket in each partition
  2. SplitEnumerator
    1. Traverse the partitions to be read and generate the corresponding SourceSplit for each bucket
    2. Filter the files to be read in the bucket according to filtering conditions, produce the files of each LSM level in SourceSplit
  3. Runtime Task
    1. Obtain the SourceSplit to be read, generate the MergeIterator of LSM, and read the data

Support Changelog

Similarly, we should shield the complexity of changelog support just like LogStore. Changelog is supported as follows:

  • DDL with Primary Key
    • LSM Key: Primary Key
    • LSM Value: Row (All columns)
  • DDL without Primary Key
    • LSM Key: Row (All columns)
    • LSM Value: Count, Number of occurrences of the same record
    • Count is +1 when adding and Count is -1 when deleting. Sum count when compaction and merge reading.
  • DDL with Index Key: When there is no primary key, users can define an index key to speed up update and query. (Not in this FLIP)

...

We need an interface to discover the default factory implementation when there is no "connector=..." option:

Code Block
languagejava
titleInterface
/**

...


* Base interface for configuring a default dynamic table connector. The default table factory is

...


* used when there is no {@link

...

 FactoryUtil#CONNECTOR} option.

...


*/

...


@Internal

...


public interface DefaultDynamicTableFactory extends DynamicTableFactory

...

   @Override

...

 {

   @Override
   default String factoryIdentifier()

...

 {
      return "_default";

...

   }


   }
}


We need an interface to discover the default log factory implementation, discover Kafka log factory:

Code Block
languagejava
titleInterface
/**

...


* Base interface for configuring a default log table connector. The log table is used by {@link

...


* DefaultDynamicTableFactory}.

...


*

...

* <p>Log tables are for processing only unbounded data. Support streaming reading and streaming

* writing.

*/

@Internal

public interface DefaultLogTableFactory extends DynamicTableFactory {

   @Override

   default String factoryIdentifier() {

...


* <p>Log tables are for processing only unbounded data. Support streaming reading and streaming
* writing.
*/
@Internal
public interface DefaultLogTableFactory extends DynamicTableFactory {

   @Override
   default String factoryIdentifier() {
       return "_default_log";

...

   }


   }
}


As mentioned above, we need to minimize the user's options, so we need to assign the options of the table according to the configuration. Moreover, we need to create the underlying physical storage when creating tables, such as Kafka topic:

Code Block
languagejava
titleInterface
/**

...


* An object that can receive a notification when a table is created. Notification occurs before

...


* {@link

...

 Catalog#createTable} is called.

...


*

...


* <p>Only {@link DefaultDynamicTableFactory} and {@link DefaultLogTableFactory} support

...


* notification.

...


*/

...


@Internal

...


public interface CreateTableListener

...

 {

   /**

...


    * Notifies the listener that a table creation occurred.

...

    *

    * @return new options of this table.

    */

   Map<String, String> onTableCreation(TableNotification context);


    *
    * @return new options of this table.
    */
   Map<String, String> onTableCreation(TableNotification context);

}


We need to delete the underlying physical storage when dropping tables, such as Kafka topic:

Code Block
languagejava
titleInterface
/**

...


* An object that can receive a notification when a table is dropped. Notification occurs before

...


* {@link

...

 Catalog#dropTable} is called.

...


*

...


* <p>Only {@link DefaultDynamicTableFactory} and {@link DefaultLogTableFactory} support

...


* notification.

...


*/

...


@Internal

...


public interface DropTableListener

...

 {

   /** Notifies the listener that a table drop occurred. */

...


   void onTableDrop(TableNotification context);

...


}


The notification of CreateTable and DropTable Listener

Code Block
languagejava
titleInterface

...

/**

...


* A notification of the operation of a table. Provides catalog and session information describing

...


* the dynamic table to be accessed.

...


*/

...


@Internal

...


public interface TableNotification

...

 {

   /** Returns the identifier of the table in the {@link Catalog}. */

...


   ObjectIdentifier getObjectIdentifier();

...



   /** Returns the table information. */

...


   CatalogTable getCatalogTable();

...



   /** Gives read-only access to the configuration of the current session. */

...


   ReadableConfig getConfiguration();

...



   /**

...


    * Returns the class loader of the current session.

...

    *

    * <p>The class loader is in particular useful for discovering further (nested) factories.

    */

   ClassLoader getClassLoader();

   /** Whether the table is temporary. */

   boolean isTemporary();

   /** Creates a copy of this instance with new options. */

   TableNotification copy(Map<String, String> newOptions);


    *
    * <p>The class loader is in particular useful for discovering further (nested) factories.
    */
   ClassLoader getClassLoader();

   /** Whether the table is temporary. */
   boolean isTemporary();

   /** Creates a copy of this instance with new options. */
   TableNotification copy(Map<String, String> newOptions);

}


Rejected Alternatives

Using Hudi

...

The Flink Built-in Dynamic Table supports free switching with and without primary key:

Code Block
languagesql
titleSQL
ALTER TABLE [catalog_name.][db_name.]table_name

...


  ADD PRIMARY KEY (column_name, ...) NOT ENFORCED

Add primary key to a table without primary key:

  • FileStore
    • Launch a job to rewrite files
    • If there are duplicate keys, the command will fail
  • LogStore
    • Truncate logs: Delete current topic and create a new topic
    • Jobs that are currently consuming this table will fail

The cost of LogStore is too high, users can't continue their streaming consumption, and can only start consumption from the latest. Therefore, it is not supported at present.

Change Buckets

Code Block
languagesql
titleSQL
ALTER TABLE [catalog_name.][db_name.]table_name [PARTITION partition_spec]

...


  CLUSTERED INTO num_buckets BUCKETS;

Change Bucket number for FileStore tradeoff between scalable and performance. Launch a job to rewrite files. (The first version is not available. Users need to delete table and create a new table)

...

  • POC branch: https://github.com/JingsongLi/flink/tree/storage_formal 
  • Implement FileStore
    • Abstract Format: support ORC and Parquet
    • Implement LSM: MemStore and Compaction
    • Implement Snapshot and Manifest: Version control
  • Implement LogStore
    • Auto create Kafka Topic
    • Integrate CDC Format and Upsert Kafka
  • Integrate Flink
    • TableFactory: DynamicSource and DynamicSink
    • Integrate to Catalog
  • Extended DMLs