Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Streaming analytics is one the most important use cases among all SQL usage. Users are relying on the streaming nature of Flink to deliver subsecond end to end latency. The typical architecture for such use case is like following: 

...

    1. It’s hard to choose the most suited external systems when the requirements include streaming pipelines, offline batch jobs, ad-hoc queries and even some point lookup queries. 
    2. Even if you already make your choice, it will definitely increase the operation and maintenance complexity. Users at least need to coordinate between the log system and file system of each table, which is error prone. 

Proposal

If you have experience with Flink SQL, you might still be familiar with SQL’s basic concept: dynamic table. In short, a dynamic table is a logical concept which has two different physical representations: changelog and table. Right now, by relying on SQL connectors, users can define table which acts like one of the physical representations, but not both.

...

  • It’s a built-in storage for Flink SQL
    • Improve usability issues
    • Flink DDL is no longer just a mapping, but a real creation for these tables
    • Masks & abstracts the underlying technical details, no annoying options
  • Supports subsecond streaming write & consumption
    • It could be backed by a service-oriented message queue (Like Kafka)
  • High throughput scan capability
    • Filesystem with colunar formats would be an ideal choice just like iceberg/hudi does.
  • More importantly, in order to solve the cognitive bar, storage needs to automatically address various Insert/Update/Delete inputs and table definitions
    • Receive any type of changelog
    • Table can have primary key or no primary key

Public Interfaces

Example

If we have a built-in Flink Dynamic Table, users just focus on their business logic:

Code Block
languagesql
titleSQL
-- Just business fields, primary key is not mandatory

CREATE TABLE intermediate_table (
  order_id BIGINT,
  auction_id BIGINT,
  category_id BIGINT,
  trans_amount BIGINT,
  create_time TIMESTAMP,
  dt STRING
) PARTITIONED BY (dt);

-- Insert into

INSERT INTO intermediate_table
SELECT
  A.order_id,
  A.auction_id,
  B.category_id,
  A.trans_amount,
  A.create_time,
  DATE_FORMAT(create_time, 'yyyy-MM-dd')
FROM orders A LEFT JOIN category_dim B
ON A.auction_id = B.auction_id;

-- Query: Streaming Pipeline

INSERT INTO ... SELECT ... FROM intermediate_table;

-- Query: Batch ad-hoc query

SELECT * FROM intermediate_table WHERE ...;

SQL Statements

CREATE

Code Block
languagesql
titleSQL
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  [WITH ('change-tracking' = 'false')]

<physical_column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]

<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

...

Very simple, it masks & abstracts the underlying technical details, no annoying options.

DROP

Code Block
languagesql
titleSQL
DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

When dropping a table, the corresponding underlying physical storage will be deleted.

COMPACT

Code Block
languagesql
titleSQL
ALTER TABLE [catalog_name.][db_name.]table_name [PARTITION partition_spec] COMPACT

Compact table for high performance query. Launch a job to rewrite files.

CHANGE TRACKING

Code Block
languagesql
titleSQL
ALTER TABLE [catalog_name.][db_name.]table_name [PARTITION partition_spec] SET 'change-tracking' = 'false'

Table

If users do not need to consume changes from the table, they can disable Change Tracking. This can reduce resource consumption.

Partition

Turn off the change tracking of a specific partition, which means that the written record will no longer be output changes, and the downstream stream consumption will not see the changes.

...

In this case, we need to close the changes tracking of this partition, batch job will not produce changes to downstream stream jobs.

READING

Code Block
languagesql
titleSQL
-- unbounded streaming reading (Read changes)
SET 'execution.runtime-mode' = 'streaming';
INSERT INTO ... SELECT ... FROM [catalog_name.][db_name.]table_name;

-- bounded reading (Read a snapshot)
SET 'execution.runtime-mode' = 'batch';
INSERT INTO ... SELECT ... FROM [catalog_name.][db_name.]table_name;

The table supports both stream reading (read changes) and high-performance batch reading.

INSERT

Code Block
languagesql
titleSQL
-- unbounded insert, not support OVERWRITE
INSERT INTO [catalog_name.][db_name.]table_name
  [PARTITION part_spec] [column_list] select_statement;

-- bounded insert
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name
  [PARTITION part_spec] [column_list] select_statement;

part_spec:
  (part_col_name1=val1 [, part_col_name2=val2, ...])

column_list:
  (col_name1 [, column_name2, ...])

Users can write any type of changelog with any SQL.

DESCRIBE

Code Block
languagesql
titleSQL
DESCRIBE DETAIL TABLE [catalog_name.][db_name.]table_name [PARTITION partition_spec]

...

DESCRIBE DETAIL TABLE without partition definition output above columns too except partition.

Configuration

In every table environment, the TableConfig offers options for configuring the current session.

...

Key

Default

Type

Description

table-storage.log.properties.bootstrap.servers

(none)

Map

Kafka brokers. eg: localhost:9092

table-storage.log.retention

(none)

Duration

It means how long changes log will be kept. The default value is from the Kafka cluster.

table-storage.file.root-path

(none)

String

Root file path.

table-storage.file.format

parquet

String

Format name for file.

table-storage.bucket

1

Integer

Bucket number for file and Partition number for Kafka.

Bucket

The record is hashed into different buckets according to the primary key (if have) or the whole row (without primary key):

...

If users want to change the bucket number, they need to delete the table and create a new table.

Concurrent Write

Only a single stream writer is allowed to write data to a Dynamic table.

Write contention uses a distributed optimistic lock mechanism, for Active partition: INSERT OVERWRITE and ALTER TABLE … COMPACT will delete files, It may conflict with the streaming job, which means that the command may fail and the user will be asked to retry.

Consistency & Visibility

The Built-in Dynamic Table provides Exactly-OnceEventually consistency.

...

(see below analysis in Design chapter)

Checkpoint

In the past, many users encountered the problem that the sink did not output because they did not open the checkpoint.

In order to improve the ease of use of the built-in dynamic table: Set checkpoint interval to 1 min if checkpoint is not enabled when the planner detects a sink to built-in dynamic table.

Proposed Design

Conceptually, Flink built-in tables consist of two parts, LogStore and FileStore. The LogStore would serve the need of message systems, while FileStore will play the role of file systems with columnar formats. At each point in time, LogStore and FileStore will store exactly the same data for the latest written data (LogStore has TTL), but with different physical layouts. If one remembers the concept of dynamic table in Flink SQL, this is exactly what we want to build here. 

...

  • LogStore: Store the latest data, support second level streaming incremental consumption, rely on Kafka
    • For full support of Insert/Update/Delete and optional primary key definition, auto select Kafka with Debezium format or Upsert-Kafka
  • FileStore: Store latest data + historical data, provide batch Ad-Hoc analysis
    • For full support of Insert/Update/Delete and optional primary key definition, we need a flexible storage structure that supports updates and custom merges, a LSM is a good choice
    • To support high performance analysis, should be columnar file format



LogStore

Log storage relies on Kafka. We use Kafka with Debezium-Avro and Upsert-Kafka as underlying storages.

...

Bucket in LogStore is Kafka Partition, which means the record is hashed into different Kafka partitions according to the primary key (if have) or the whole row (without primary key).

FileStore

Overview

As a storage system supporting real-time ad-hoc analysis:

...

  • Manifest file: represents how many files have been added and how many files have been deleted. It represents a change to the table. Manifest represents the incremental files of a version. The record schema of manifest file is DataFile:
    • data file name
    • FileKind: add or delete
    • partition
    • bucket
    • min/max key: for file skipping
    • min/max sequence number
    • statistics: data file size, row count
  • Snapshot file: a collection of manifest files that represents a snapshot of a table. Snapshot represents all files of a version. The record schema of snapshot file is ManifestFile:
    • manifest file name
    • lower/upper partition: for partition pruning
    • statistics: manifest file size, addedFileCount, deleteFileCount

Write Process

  1. LSM Process (Similar to Leveldb):
    1. Memtable is maintained in memory. Data is directly written to memtable. Each data has a sequence number. For the same key, data with large sequence will overwrite data with small sequence
    2. When the memtable is full or PrepareCommit, flush the memtable, sort the memtable by key + sequence number, merge the duplicate keys, and write the data to the remote file using a specific format
    3. The asynchronous thread performs LSM compactions
  2. Prepare Commit
    1. Flush MemTable
    2. Commit message is: DeleteFiles and AddFiles.
  3. Global Commit
    1. Get old Snapshots, if this checkpoint has been committed, just return
    2. Read the previous snapshot-${i}, write the deleteFiles and addFiles of buckets to the new manifest, and generate a new snapshot-${i+1}

Compaction

Auto compaction is in the streaming sink (writer).

...

In our scenario, writing is more. Although leveled compaction may have a better compression rate in the aggregation scenario, in the first version, we first provide universal compaction.

Query Process

  1. Planner
    1. Read the current snapshot, prune partitions according to filtering conditions, and obtain manifests to be read
    2. Merge the deleteFiles and addFiles in manifests to generate a file list for each bucket in each partition
  2. SplitEnumerator
    1. Traverse the partitions to be read and generate the corresponding SourceSplit for each bucket
    2. Filter the files to be read in the bucket according to filtering conditions, produce the files of each LSM level in SourceSplit
  3. Runtime Task
    1. Obtain the SourceSplit to be read, generate the MergeIterator of LSM, and read the data

Support Changelog

Similarly, we should shield the complexity of changelog support just like LogStore. Changelog is supported as follows:

...

  • DDL with Index Key: When there is no primary key, users can define an index key to speed up update and query. (Not in this FLIP)

Query Pushdown

FileStore can support more compaction strategies, help the input data to achieve the effect of lazy computation. (Not in this FLIP) For example:

  • SUM Compaction: Non-key fields will be grouped by to sum aggregation.
  • COALESCE Compaction: just store non-null fields, It can replace streaming join to widen the fields

Visibility Analysis 

(See above ‘Consistency & Visibility’ in public interfaces)

FileStore Visibility

Compared with LogStore, the visibility of FileStore is less important. More importantly, it can store a large amount of queryable data.

Only files submitted to DFS through checkpoint can be seen by readers, the latency depends on checkpoint interval.

LogStore Visibility

LogStore latency is very important and requires high visibility. But on the other hand, we also need to ensure Exactly-once consistency.

...

We can close the Kafka transaction when there is a primary key to achieve a sub second delay, even if there is a large checkpoint interval.

Analysis

Let's enumerate the various visibility:

...

In the MVP version, we only provide #4 way by default.

Internal Interfaces

All the following interfaces are marked as internal, because there is only one implementation at present and it is not intended to be open to external users.

...

Code Block
languagejava
titleInterface
/**
* A notification of the operation of a table. Provides catalog and session information describing
* the dynamic table to be accessed.
*/
@Internal
public interface TableNotification {

   /** Returns the identifier of the table in the {@link Catalog}. */
   ObjectIdentifier getObjectIdentifier();

   /** Returns the table information. */
   CatalogTable getCatalogTable();

   /** Gives read-only access to the configuration of the current session. */
   ReadableConfig getConfiguration();

   /**
    * Returns the class loader of the current session.
    *
    * <p>The class loader is in particular useful for discovering further (nested) factories.
    */
   ClassLoader getClassLoader();

   /** Whether the table is temporary. */
   boolean isTemporary();

   /** Creates a copy of this instance with new options. */
   TableNotification copy(Map<String, String> newOptions);

}


Rejected Alternatives

Using Hudi

Hudi:https://hudi.apache.org/

...

  • Hudi aims to support the update of upsert, so it needs to forcibly define the primary key and time column. It is not easy to support all changelog types
  • The update of Hudi is based on the index (currently there are BloomFilter and HBase). The data in the bucket is out of order. Every merge needs to be reread and rewritten, which is expensive. We need fast update storage, LSM is more suitable.

Add Primary Key

The Flink Built-in Dynamic Table supports free switching with and without primary key:

...

The cost of LogStore is too high, users can't continue their streaming consumption, and can only start consumption from the latest. Therefore, it is not supported at present.

Change Buckets

Code Block
languagesql
titleSQL
ALTER TABLE [catalog_name.][db_name.]table_name [PARTITION partition_spec]
  CLUSTERED INTO num_buckets BUCKETS;

Change Bucket number for FileStore tradeoff between scalable and performance. Launch a job to rewrite files. (The first version is not available. Users need to delete table and create a new table)

Implementation Plan

  • POC branch: https://github.com/JingsongLi/flink/tree/storage_formal 
  • Implement FileStore
    • Abstract Format: support ORC and Parquet
    • Implement LSM: MemStore and Compaction
    • Implement Snapshot and Manifest: Version control
  • Implement LogStore
    • Auto create Kafka Topic
    • Integrate CDC Format and Upsert Kafka
  • Integrate Flink
    • TableFactory: DynamicSource and DynamicSink
    • Integrate to Catalog
  • Extended DMLs