Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When dropping a table, the corresponding underlying physical storage will be deleted.

(If the user does not drop the table through Flink, the physical storage under the table may not be deleted all the time. The user can record the response address through describe and handle it by himself)

COMPACT

Code Block
languagesql
titleSQL
ALTER TABLE [catalog_name.][db_name.]table_name [PARTITION partition_spec] COMPACT

...

DESCRIBE DETAIL TABLE output:

name

type

description

name

String

catalog.database.tableName

log.kafka.bootstrap.servers

Map

Kafka brokers

log.retention

Duration

how long changes log will be kept

file.path

String

File path

log.kafka.topic

String

topic of Kafka

watermark

Timestamp

Watermark of the latest written data (If table defines watermark)

file.format

String

format for file

bucket

Integer

bucket number


DESCRIBE DETAIL … PARTITION output:

name

type

description

partition

String

partition spec

file.path

String

path of this partition

last-modified

Timestamp

last modified time

compacted

Boolean

is compacted

change-tracking

Boolean

does this partition tracking changes

num-files

Integer

file number


DESCRIBE DETAIL TABLE without partition definition output above columns too except partition.

...

We put necessary configurations in the global configuration to avoid the need for users to configure each individual table.

Key

Default

Type

Description

table-storage.log.properties.bootstrap.servers

(none)

Map

Kafka brokers. eg: localhost:9092

table-storage.log.retention

(none)

Duration

It means how long changes log will be kept. The default value is from the Kafka cluster.

table-storage.log.scan.startup.modeinitialStartupModeSpecifies the startup mode for log consumer. Another choice is latest-offset.

table-storage.file.root-path

(none)

String

Root file path.

table-storage.file.format

parquet

String

Format name for file.

table-storage.bucket

1

Integer

Bucket number for file and Partition number for Kafka.

Bucket

The record is hashed into different buckets according to the primary key (if have) or the whole row (without primary key):

...

In order to improve the ease of use of the built-in dynamic table: Set checkpoint interval to 1 min if checkpoint is not enabled when the planner detects a sink to built-in dynamic table.

Proposed Design

Conceptually, Flink built-in tables consist of two parts, LogStore and FileStore. The LogStore would serve the need of message systems, while FileStore will play the role of file systems with columnar formats. At each point in time, LogStore and FileStore will store exactly the same data for the latest written data (LogStore has TTL), but with different physical layouts. If one remembers the concept of dynamic table in Flink SQL, this is exactly what we want to build here. 

The LogStore data has faster Time-To-Live, and FileStore ensures that historical data can be queried:

  • LogStore: Store the latest data, support second level streaming incremental consumption, rely on Kafka
    • For full support of Insert/Update/Delete and optional primary key definition, auto select Kafka with Debezium format or Upsert-Kafka
  • FileStore: Store latest data + historical data, provide batch Ad-Hoc analysis
    • For full support of Insert/Update/Delete and optional primary key definition, we need a flexible storage structure that supports updates and custom merges, a LSM is a good choice
    • To support high performance analysis, should be columnar file format

...

LogStore

Log storage relies on Kafka. We use Kafka with Debezium-Avro and Upsert-Kafka as underlying storages.

  • DDL with Primary Key: Use Upsert-Kafka + Avro format
  • DDL without Primary Key: Use Kafka with Debezium-Avro format

Bucket in LogStore is Kafka Partition, which means the record is hashed into different Kafka partitions according to the primary key (if have) or the whole row (without primary key).

FileStore

Overview

As a storage system supporting real-time ad-hoc analysis:

  • LSM with Columnar format
  • Fast update and data skipping
  • High compression ratio and high performance analysis
  • Partition and Bucket
  • data warehouse support
  • Consistency
  • file management
  • version control

The directory structure of FileStore on DFS is as follows:

Image Removed

Data directory description:

  • Part Directory: partition directory, defined by "PARTITIONED BY" in DDL, represents a partition with the same directory name as Hive, such as "dt=2020-08-08"
  • Bucket Directory: the bucket under the partition. The data falls to a bucket through hash. The bucket is an LSM composed of multiple files
  • LSM datafiles: data file, abstract format, supporting orc, parquet and Avro. The record schema of data file:
    • SequenceNumber
    • ValueKind(add or delete)
    • RowData: key
    • RowData: value

Meta file description:

  • Manifest file: represents how many files have been added and how many files have been deleted. It represents a change to the table. Manifest represents the incremental files of a version. The record schema of manifest file is DataFile:
    • data file name
    • FileKind: add or delete
    • partition
    • bucket
    • min/max key: for file skipping
    • min/max sequence number
    • statistics: data file size, row count
  • Snapshot file: a collection of manifest files that represents a snapshot of a table. Snapshot represents all files of a version. The record schema of snapshot file is ManifestFile:
    • manifest file name
    • lower/upper partition: for partition pruning
    • statistics: manifest file size, addedFileCount, deleteFileCount

Write Process

  1. LSM Process (Similar to Leveldb):
    1. Memtable is maintained in memory. Data is directly written to memtable. Each data has a sequence number. For the same key, data with large sequence will overwrite data with small sequence
    2. When the memtable is full or PrepareCommit, flush the memtable, sort the memtable by key + sequence number, merge the duplicate keys, and write the data to the remote file using a specific format
    3. The asynchronous thread performs LSM compactions
  2. Prepare Commit
    1. Flush MemTable
    2. Commit message is: DeleteFiles and AddFiles.
  3. Global Commit
    1. Get old Snapshots, if this checkpoint has been committed, just return
    2. Read the previous snapshot-${i}, write the deleteFiles and addFiles of buckets to the new manifest, and generate a new snapshot-${i+1}

Compaction

Auto compaction is in the streaming sink (writer).

We do not have independent services to compact. Independent services will bring a lot of additional design complexity, and we only need a decoupled storage in the current version. (In future, if we have a service, we can let the streaming writer be the pure append writer.)

For each LSM, there is only one streaming writer, and this writer also needs to be responsible for its compaction.

About compaction strategy, at present, we don't have enough tests to adjust the compaction strategy. We can refer to the two mainstream strategies of rocksdb:

Leveled Compaction: 

The trigger of compaction depends on three options:

  • level0_file_num_compaction_trigger: When the number of level0 files exceeds this value, compact level0 files to level1
  • max_bytes_for_level_base and max_bytes_for_level_multiplier: If the base is 1GB, multiplier is 5, then if the data of level1 exceeds 1GB, the compaction will be performed, and if the data of level2 exceeds 5GB, the compaction will be performed...

For Leveled Compaction, every level is a sort Run (except level0). SSTs in level1 will merge with level2, and finally form orderly SSTs in Level2, and each SST will not overlap. Leveled Compaction is the default strategy of RocksDb:

  • Write amplify: bad, the data will be compacted once and once
  • Read and Space amplify: good, every level no overlap

Universal Compaction:

  • universal_sort_run_num_compaction_trigger: When the number of sort run exceeds this value, do compaction
  • universal_max_size_amplification_percent
  • universal_size_ratio

In universal mode, there are many sort runs. For R1, R2, R3,..., Rn, each R is a sort run, R1 contains the latest data, and Rn contains the oldest data. When the preconditions are met, the following compaction is triggered in priority order:

  • Compaction by Space Amplification, will do full compaction to compact all sort runs. Amplification is:
    • size(R1) + size(R2) + … + size(Rn-1) / size(Rn) (If the frequency of delete is similar to the frequency of insertion)
  • Compaction by Individual Size Ratio: If the previous size (R1) is less than the size (R2) in a certain proportion, the default is 1%, then perform a compaction with R1 and R2. If (R1 + R2) * (100 + ratio)% 100 < R3, add R3 to the compaction.
  • Compaction by Sort Run Number: If none of the above is triggered, the first few are forced to be compacted

Compared to Leveled Compaction, Universal compaction:

  • Write amplify: good, old data will not be compacted once and once
  • Read and Space amplify: bad, a larger number of Sort Runs

In our scenario, writing is more. Although leveled compaction may have a better compression rate in the aggregation scenario, in the first version, we first provide universal compaction.

Query Process

  1. Planner
    1. Read the current snapshot, prune partitions according to filtering conditions, and obtain manifests to be read
    2. Merge the deleteFiles and addFiles in manifests to generate a file list for each bucket in each partition
  2. SplitEnumerator
    1. Traverse the partitions to be read and generate the corresponding SourceSplit for each bucket
    2. Filter the files to be read in the bucket according to filtering conditions, produce the files of each LSM level in SourceSplit
  3. Runtime Task
    1. Obtain the SourceSplit to be read, generate the MergeIterator of LSM, and read the data

Support Changelog

Similarly, we should shield the complexity of changelog support just like LogStore. Changelog is supported as follows:

  • DDL with Primary Key
    • LSM Key: Primary Key
    • LSM Value: Row (All columns)
  • DDL without Primary Key
    • LSM Key: Row (All columns)
    • LSM Value: Count, Number of occurrences of the same record
    • Count is +1 when adding and Count is -1 when deleting. Sum count when compaction and merge reading.
  • DDL with Index Key: When there is no primary key, users can define an index key to speed up update and query. (Not in this FLIP)

Query Pushdown

FileStore can support more compaction strategies, help the input data to achieve the effect of lazy computation. (Not in this FLIP) For example:

  • SUM Compaction: Non-key fields will be grouped by to sum aggregation.
  • COALESCE Compaction: just store non-null fields, It can replace streaming join to widen the fields

Visibility Analysis 

(See above ‘Consistency & Visibility’ in public interfaces)

FileStore Visibility

Compared with LogStore, the visibility of FileStore is less important. More importantly, it can store a large amount of queryable data.

Only files submitted to DFS through checkpoint can be seen by readers, the latency depends on checkpoint interval.

LogStore Visibility

LogStore latency is very important and requires high visibility. But on the other hand, we also need to ensure Exactly-once consistency.

So a table without Primary Key, in combination with Kafka's transaction mechanism, similarly, only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval.

But with Primary Key, can we do more? LogStore uses Upsert-Kafka as the underlying implementation.

When reading an Upsert-Kafka, the downstream job will generate a special node after the source: The node normalizes an UPSERT stream containing duplicate events. This node normalizes such a stream into a regular changelog stream that contains INSERT / UPDATE_BEFORE / UPDATE_AFTER / DELETE records without duplication.

If we close the Kafka transaction, the upstream producer generates duplicate changes. This means that there may be some duplication of data. However, if there is a normalized node, even if there is duplicate data, the corresponding update will be generated to UPDATE_BEFORE / UPDATE_AFTER records, duplicate data will be retracted, which means that the eventual consistency can be achieved after recovery.

We can close the Kafka transaction when there is a primary key to achieve a sub second delay, even if there is a large checkpoint interval.

Analysis

Let's enumerate the various visibility:

1.FileStore immediately visible? 

  • No way now, it will generate a large number of small files

2.FileStore & LogStore checkpoint visible: 

  • Pros: In any case, it is Exactly-once
  • Cons: latency is very high. Reach the minute level at present.

3.FileStore checkpoint visible, LogStore immediately visible

  • Cons: The semantics generated by FileStore and LogStore are inconsistent, and the results generated by users in the two ways are inconsistent, which we should avoid

4.FileStore checkpoint visible, LogStore without PK checkpoint visible, LogStore with PK immediately visible, the downstream Flink stream job will be deduplicated according to the primary key

  • Pros: 
    • Streaming pipeline can achieve sub second delay with Primary key
    • In any case, it is Exactly-once (Consumers will deduplicate records according to PK).  LogStore is a message system, if there is primary key definition, with updates, the message system can not guarantee that there is no duplication, because it has no ability to remove duplication

In the MVP version, we only provide #4 way by default.

Internal Interfaces

All the following interfaces are marked as internal, because there is only one implementation at present and it is not intended to be open to external users.

We need an interface to discover the default factory implementation when there is no "connector=..." option:

Code Block
languagejava
titleInterface
/**
* Base interface for configuring a default dynamic table connector. The default table factory is
* used when there is no {@link FactoryUtil#CONNECTOR} option.
*/
@Internal
public interface DefaultDynamicTableFactory extends DynamicTableFactory {

   @Override
   default String factoryIdentifier() {
      return "_default";
   }
}

We need an interface to discover the default log factory implementation, discover Kafka log factory:

Code Block
languagejava
titleInterface
/**
* Base interface for configuring a default log table connector. The log table is used by {@link
* DefaultDynamicTableFactory}.
*
* <p>Log tables are for processing only unbounded data. Support streaming reading and streaming
* writing.
*/
@Internal
public interface DefaultLogTableFactory extends DynamicTableFactory {

   @Override
   default String factoryIdentifier() {
       return "_default_log";
   }
}

As mentioned above, we need to minimize the user's options, so we need to assign the options of the table according to the configuration. Moreover, we need to create the underlying physical storage when creating tables, such as Kafka topic:

Code Block
languagejava
titleInterface
/**
* An object that can receive a notification when a table is created. Notification occurs before
* {@link Catalog#createTable} is called.
*
* <p>Only {@link DefaultDynamicTableFactory} and {@link DefaultLogTableFactory} support
* notification.
*/
@Internal
public interface CreateTableListener {

   /**
    * Notifies the listener that a table creation occurred.
    *
    * @return new options of this table.
    */
   Map<String, String> onTableCreation(TableNotification context);

}

We need to delete the underlying physical storage when dropping tables, such as Kafka topic:

Code Block
languagejava
titleInterface
/**
* An object that can receive a notification when a table is dropped. Notification occurs before
* {@link Catalog#dropTable} is called.
*
* <p>Only {@link DefaultDynamicTableFactory} and {@link DefaultLogTableFactory} support
* notification.
*/
@Internal
public interface DropTableListener {

   /** Notifies the listener that a table drop occurred. */
   void onTableDrop(TableNotification context);
}

The notification of CreateTable and DropTable Listener

...

languagejava
titleInterface

...

HybridSource

If the log startup mode is initial, scan will perform a latest initial snapshot on the table upon first startup, and continue to read the latest changes.

The switching between snapshot and changes is exactly-once consistent because we store the offset of the corresponding log to snapshot when writing data.

Table Interfaces

A catalog that supports built-in dynamic table needs to implement GenericCatalog, Now, the GenericInMemoryCatalog and HiveCatalog will implement this interface:

Code Block
languagejava
titleInterface
/**
 * This generic catalog can store any object created by Flink DDL or Table API, and it is only
 * responsible for storing their metadata. It acts as a database, holding relevant meta information.
 *
 * <p>This interface is distinguished from external Catalog. ExternalCatalog may have its own
 * managed table, which may conflict with Flink managed table. When implementing this interface, if
 * there is no specified connector, it should be interpreted as Flink managed table.
 */
@PublicEvolving
public interface GenericCatalog extends Catalog {}


We need an interface to discover the managed table factory implementation when there is no "connector=..." option::

Code Block
languagejava
titleInterface
/**
 * Base interface for configuring a managed dynamic table connector. The managed table factory is
 * used when there is no {@link FactoryUtil#CONNECTOR} option.
 */
@Internal
public interface ManagedTableFactory extends DynamicTableFactory {

    @Override
    default String factoryIdentifier() {
        return "";
    }

    /**
     * Enrich options from catalog and session information.
     *
     * @return new options of this table.
     */
    Map<String, String> enrichOptions(Context context);

    /** Notifies the listener that a table creation occurred. */
    void onCreateTable(Context context);

    /** Notifies the listener that a table drop occurred. */
    void onDropTable(Context context);
}


Table API for creating managed table:

Code Block
languagejava
titleInterface
@PublicEvolving
public class TableDescriptor {

    /** Creates a new {@link Builder} for a managed dynamic table. */
    public static Builder forManaged() {
        return new Builder();
    }

    ...
}

Proposed Design

Conceptually, Flink built-in tables consist of two parts, LogStore and FileStore. The LogStore would serve the need of message systems, while FileStore will play the role of file systems with columnar formats. At each point in time, LogStore and FileStore will store exactly the same data for the latest written data (LogStore has TTL), but with different physical layouts. If one remembers the concept of dynamic table in Flink SQL, this is exactly what we want to build here. 

The LogStore data has faster Time-To-Live, and FileStore ensures that historical data can be queried:

  • LogStore: Store the latest data, support second level streaming incremental consumption, rely on Kafka
    • For full support of Insert/Update/Delete and optional primary key definition, auto select Kafka with Debezium format or Upsert-Kafka
  • FileStore: Store latest data + historical data, provide batch Ad-Hoc analysis
    • For full support of Insert/Update/Delete and optional primary key definition, we need a flexible storage structure that supports updates and custom merges, a LSM is a good choice
    • To support high performance analysis, should be columnar file format

Image Added

LogStore

Log storage relies on Kafka. We use Kafka with Debezium-Avro and Upsert-Kafka as underlying storages.

  • DDL with Primary Key: Use Upsert-Kafka + Avro format
  • DDL without Primary Key: Use Kafka with Debezium-Avro format

Bucket in LogStore is Kafka Partition, which means the record is hashed into different Kafka partitions according to the primary key (if have) or the whole row (without primary key).

FileStore

Overview

As a storage system supporting real-time ad-hoc analysis:

  • LSM with Columnar format
  • Fast update and data skipping
  • High compression ratio and high performance analysis
  • Partition and Bucket
  • data warehouse support
  • Consistency
  • file management
  • version control

The directory structure of FileStore on DFS is as follows:

Image Added

Data directory description:

  • Part Directory: partition directory, defined by "PARTITIONED BY" in DDL, represents a partition with the same directory name as Hive, such as "dt=2020-08-08"
  • Bucket Directory: the bucket under the partition. The data falls to a bucket through hash. The bucket is an LSM composed of multiple files
  • LSM datafiles: data file, abstract format, supporting orc, parquet and Avro. The record schema of data file:
    • SequenceNumber
    • ValueKind(add or delete)
    • RowData: key
    • RowData: value


Meta file description:

  • Manifest file: represents how many files have been added and how many files have been deleted. It represents a change to the table. Manifest represents the incremental files of a version. The record schema of manifest file is DataFile:
    • data file name
    • FileKind: add or delete
    • partition
    • bucket
    • min/max key: for file skipping
    • min/max sequence number
    • statistics: data file size, row count
  • Snapshot file: a collection of manifest files that represents a snapshot of a table. Snapshot represents all files of a version. The record schema of snapshot file is ManifestFile:
    • manifest file name
    • lower/upper partition: for partition pruning
    • statistics: manifest file size, addedFileCount, deleteFileCount

Write Process

  1. LSM Process (Similar to Leveldb):
    1. Memtable is maintained in memory. Data is directly written to memtable. Each data has a sequence number. For the same key, data with large sequence will overwrite data with small sequence
    2. When the memtable is full or PrepareCommit, flush the memtable, sort the memtable by key + sequence number, merge the duplicate keys, and write the data to the remote file using a specific format
    3. The asynchronous thread performs LSM compactions
  2. Prepare Commit
    1. Flush MemTable
    2. Commit message is: DeleteFiles and AddFiles.
  3. Global Commit
    1. Get old Snapshots, if this checkpoint has been committed, just return
    2. Read the previous snapshot-${i}, write the deleteFiles and addFiles of buckets to the new manifest, and generate a new snapshot-${i+1}

Compaction

Auto compaction is in the streaming sink (writer).

We do not have independent services to compact. Independent services will bring a lot of additional design complexity, and we only need a decoupled storage in the current version. (In future, if we have a service, we can let the streaming writer be the pure append writer.)

For each LSM, there is only one streaming writer, and this writer also needs to be responsible for its compaction.

About compaction strategy, at present, we don't have enough tests to adjust the compaction strategy. We can refer to the two mainstream strategies of rocksdb:

Leveled Compaction: 

The trigger of compaction depends on three options:

  • level0_file_num_compaction_trigger: When the number of level0 files exceeds this value, compact level0 files to level1
  • max_bytes_for_level_base and max_bytes_for_level_multiplier: If the base is 1GB, multiplier is 5, then if the data of level1 exceeds 1GB, the compaction will be performed, and if the data of level2 exceeds 5GB, the compaction will be performed...

For Leveled Compaction, every level is a sort Run (except level0). SSTs in level1 will merge with level2, and finally form orderly SSTs in Level2, and each SST will not overlap. Leveled Compaction is the default strategy of RocksDb:

  • Write amplify: bad, the data will be compacted once and once
  • Read and Space amplify: good, every level no overlap


Universal Compaction:

  • universal_sort_run_num_compaction_trigger: When the number of sort run exceeds this value, do compaction
  • universal_max_size_amplification_percent
  • universal_size_ratio

In universal mode, there are many sort runs. For R1, R2, R3,..., Rn, each R is a sort run, R1 contains the latest data, and Rn contains the oldest data. When the preconditions are met, the following compaction is triggered in priority order:

  • Compaction by Space Amplification, will do full compaction to compact all sort runs. Amplification is:
    • size(R1) + size(R2) + … + size(Rn-1) / size(Rn) (If the frequency of delete is similar to the frequency of insertion)
  • Compaction by Individual Size Ratio: If the previous size (R1) is less than the size (R2) in a certain proportion, the default is 1%, then perform a compaction with R1 and R2. If (R1 + R2) * (100 + ratio)% 100 < R3, add R3 to the compaction.
  • Compaction by Sort Run Number: If none of the above is triggered, the first few are forced to be compacted

Compared to Leveled Compaction, Universal compaction:

  • Write amplify: good, old data will not be compacted once and once
  • Read and Space amplify: bad, a larger number of Sort Runs


In our scenario, writing is more. Although leveled compaction may have a better compression rate in the aggregation scenario, in the first version, we first provide universal compaction.

Query Process

  1. Planner
    1. Read the current snapshot, prune partitions according to filtering conditions, and obtain manifests to be read
    2. Merge the deleteFiles and addFiles in manifests to generate a file list for each bucket in each partition
  2. SplitEnumerator
    1. Traverse the partitions to be read and generate the corresponding SourceSplit for each bucket
    2. Filter the files to be read in the bucket according to filtering conditions, produce the files of each LSM level in SourceSplit
  3. Runtime Task
    1. Obtain the SourceSplit to be read, generate the MergeIterator of LSM, and read the data

Support Changelog

Similarly, we should shield the complexity of changelog support just like LogStore. Changelog is supported as follows:

  • DDL with Primary Key
    • LSM Key: Primary Key
    • LSM Value: Row (All columns)
  • DDL without Primary Key
    • LSM Key: Row (All columns)
    • LSM Value: Count, Number of occurrences of the same record
    • Count is +1 when adding and Count is -1 when deleting. Sum count when compaction and merge reading.
  • DDL with Index Key: When there is no primary key, users can define an index key to speed up update and query. (Not in this FLIP)

Query Pushdown

FileStore can support more compaction strategies, help the input data to achieve the effect of lazy computation. (Not in this FLIP) For example:

  • SUM Compaction: Non-key fields will be grouped by to sum aggregation.
  • COALESCE Compaction: just store non-null fields, It can replace streaming join to widen the fields

Visibility Analysis 

(See above ‘Consistency & Visibility’ in public interfaces)

FileStore Visibility

Compared with LogStore, the visibility of FileStore is less important. More importantly, it can store a large amount of queryable data.

Only files submitted to DFS through checkpoint can be seen by readers, the latency depends on checkpoint interval.

LogStore Visibility

LogStore latency is very important and requires high visibility. But on the other hand, we also need to ensure Exactly-once consistency.

So a table without Primary Key, in combination with Kafka's transaction mechanism, similarly, only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval.

But with Primary Key, can we do more? LogStore uses Upsert-Kafka as the underlying implementation.

When reading an Upsert-Kafka, the downstream job will generate a special node after the source: The node normalizes an UPSERT stream containing duplicate events. This node normalizes such a stream into a regular changelog stream that contains INSERT / UPDATE_BEFORE / UPDATE_AFTER / DELETE records without duplication.

If we close the Kafka transaction, the upstream producer generates duplicate changes. This means that there may be some duplication of data. However, if there is a normalized node, even if there is duplicate data, the corresponding update will be generated to UPDATE_BEFORE / UPDATE_AFTER records, duplicate data will be retracted, which means that the eventual consistency can be achieved after recovery.

We can close the Kafka transaction when there is a primary key to achieve a sub second delay, even if there is a large checkpoint interval.

Analysis

Let's enumerate the various visibility:

1.FileStore immediately visible? 

  • No way now, it will generate a large number of small files

2.FileStore & LogStore checkpoint visible: 

  • Pros: In any case, it is Exactly-once
  • Cons: latency is very high. Reach the minute level at present.

3.FileStore checkpoint visible, LogStore immediately visible

  • Cons: The semantics generated by FileStore and LogStore are inconsistent, and the results generated by users in the two ways are inconsistent, which we should avoid

4.FileStore checkpoint visible, LogStore without PK checkpoint visible, LogStore with PK immediately visible, the downstream Flink stream job will be deduplicated according to the primary key

  • Pros: 
    • Streaming pipeline can achieve sub second delay with Primary key
    • In any case, it is Exactly-once (Consumers will deduplicate records according to PK).  LogStore is a message system, if there is primary key definition, with updates, the message system can not guarantee that there is no duplication, because it has no ability to remove duplication

In the MVP version, we only provide #4 way by default.

Rejected Alternatives

Using Hudi

...