Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Motivation

Currently paimon has two storage tiers: fileStore and LogStorelogStore. If the business is based on exactly-once semantics or is not sensitive to latency, LogStore does logStore does not need to be enabled. 

However, in scenarios where the business accepts at-least-once semantics and requires low latency, LogStore will logStore will be enabled to accelerate data visibility, such as real-time indicator calculation scenarios. 

In the current implementation of paimon, only kafka-based implementation is provided, and only Flink flink is supported for use. This brings the following limitations:

  1. Increased maintenance and management costs: additional maintenance of the kafka service is required. At the same time, because paimon writes and consumes according to bucket granularity, the number of Kafka kafka patitions needs to be changed simultaneously when modifying the paimon bucket; Kafka  In addition, kafka also needs to be involved when troubleshooting;.
  2. Inconsistent data management: paimon manages data at the granularity of snapshots. The life cycle of the changelog is consistent with that of snapshots. However, Kafkakafka's data expiration strategy cannot be consistent with Paimon;At paimon;At the same time, paimon cannot delete LogStore the data of logStore synchronously during rollback, which will cause consume old data in LogStorelogStore;
  3. Dynamic scaling is difficult to achieve: paimon has implemented automatic scaling of buckets for primary key tables, but it is difficult for kafka to automatically adapt to paimon's buckets.
  4. Storage redundancy: in order to ensure that the data in LogStore logStore is valid, the data expiration time of kafka needs to be longer than the life cycle of Paimon paimon's snapshot.


Therefore, we hope to provide a lightweight LogStore logStore in paimon that does not rely on other services to achieve:

  1. Out-of-the-box LogStore storage,which logStore storage, which does not rely on other services and reduce maintenance and management costs;
  2. The data life cycle is consistent with paimon’s snapshot, reducing redundant storagepaimon's snapshot to reduce redundant data;
  3. Data sharding is aligned consistent with paimon’s partitions and buckets to support the related features of paimon’s data sharding-related features;
  4. Support second-level data visibility under at-least-once semantics.

...

  1. Add the createLogStoreProvider interface to the existing LogStoreTableFactory.

    Code Block
    languagejava
    republic interface LogStoreTableFactory<LogStore> extends Factory { 
    	LogStoreProvider createLogStoreProvider(Context context); 
    }


  2. Add a new LogStoreProvider interface for creating LogStore objects;

    Code Block
    languagejava
    public interface LogStoreProvider<LogStore> extends Serializable { 
    	LogStore createLogStore(Context context); 
    }


  3. Add a new LogStore interface and define the basic interface of LogStore.

    Code Block
    languagejava
    public inteface LogStore<CHKT> extends Serializable {
    
        void start();
    
        CHKT checkpoint();
    
        void shutdown();
    
        DeployMode deployMode(Context context);
    }
    
    enum DeployMode {
        EMBEDDED,
        EXTERNAL
    }


Proposed Changes

Overview

Image RemovedImage Added

  1. LogWriter: Created by LogSinkProvider and used to send data to LogStore service;
  2. LogReader: Created by LogSourceProvider and used to read data from LogStore service;
  3. LogStore service: Publish the service address after startup. Then receive read / write / snapshot and other requests from reader / writer to manage LogStore data.

...

  1. EMBEDDED: The corresponding LogStore service is automatically launched when the writer node is started. Each LogStore service is responsible for reading and writing all buckets on the writer node;
    Image RemovedImage Added
  2. EXTERNAL:Deploy LogStore service externally, for example, use Flink a flink Job for deployment. When each writer node writes, it is routed to a specific LogStore service according to the bucket.
    Image Removed
    Image Added

Write & Read

  1. Write: LogWriter only needs to continuously send write requests to LogStore service, and data management is performed by LogStore service;
  2. Read: When the downstream consumption lag is low, data is mainly read from the LogStore service through socket. When the lag is large and these data has been persisted from the LogStore service to DFSdfs, the downstream will read directly from DFS dfs file until the lag is small enough.
    Image RemovedImage Added

Checkpoint

Every time the writer node triggers a checkpoint, the LogStore service synchronously uploads local incremental data to DFS dfs for persistence, and returns the addresses of these files. The GlobalCommitter node then writes these addresses into the manifest corresponding to the log.

In order to record these manifests, we need to modify the version of the snapshot and add the LogStoreManifestList field to the snapshot to record the LogStore data of log store  generated in this snapshot.

Image RemovedImage Added

Data Cleaning

Snapshot expires

The same way FileStore fileStore cleans up files, read LogStoreManifestList and then delete logFile in order.

...

Since the LogStore service has persisted the local data to DFS dfs after the checkpoint is completed, this part of the incremental data can be cleaned locally. However, considering that downstream reading does not have zero latency, the LogStore service can locally retain the incremental data of 2 checkpoints by default. If the amount of data is large, it can be reduced to retaining only the incremental data after the latest checkpoint.

...

In most cases, the downstream job always consumes the latest offset written by the upstream, so the data in the memory of the writing end LogStore service can cover the read requests in most scenarios. However, due to faults, memory data being eliminated, etc., the downstream job may not be able to find the data in the memory of the LogStore service. Fault tolerance needs to be implemented for this situation.

...

After the upstream job fails and restarts, it will continue to write to the offset of the recently committed snapshot. If the LogStore service detects repeated offset writing, it will directly discard the original data after the offset. And notify the downstream job that the offset needs to be reset. The downstream job needs to reset the offset to the end position of the snapshot closest to the offset and re-consume it. The data read before will be considered as repeated reading caused by dirty reading.Image Removed

Image Added

Read side failure

...