Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion thread
Vote thread
ISSUE
Release

Motivation

Currently paimon has two storage tiers: fileStore and LogStore. If the business is based on exactly-once semantics or is not sensitive to latency, LogStore does not need to be enabled. 

...

  1. Add the createLogStoreProvider interface to the existing LogStoreTableFactory.

    Code Block
    languagejava
    republic interface LogStoreTableFactory<LogStore> extends Factory { 
    	LogStoreProvider createLogStoreProvider(Context context); 
    }


  2. Add a new LogStoreProvider interface for creating LogStore objects;

    Code Block
    languagejava
    public interface LogStoreProvider<LogStore> extends Serializable { 
    	LogStore createLogStore(Context context); 
    }


  3. Add a new LogStore interface and define the basic interface of LogStore.

    Code Block
    languagejava
    public inteface LogStore<CHKT> extends Serializable {
    
        void start();
    
        CHKT checkpoint();
    
        void shutdown();
    
        DeployMode deployMode(Context context);
    }
    
    enum DeployMode {
        EMBEDDED,
        EXTERNAL
    }


Proposed Changes

Overview

Image RemovedImage Added

  1. LogWriter: Created by LogSinkProvider and used to send data to LogStore service;
  2. LogReader: Created by LogSourceProvider and used to read data from LogStore service;
  3. LogStore service: Publish the service address after startup. Then receive read / write / snapshot and other requests from reader / writer to manage LogStore data.

...

  1. EMBEDDED: The corresponding LogStore service is automatically launched when the writer node is started. Each LogStore service is responsible for reading and writing all buckets on the writer node;
    Image RemovedImage Added
  2. EXTERNAL:Deploy LogStore service externally, for example, use Flink Job for deployment. When each writer node writes, it is routed to a specific LogStore service according to the bucket.
    Image Removed
    Image Added

Write & Read

  1. Write: LogWriter only needs to continuously send write requests to LogStore service, and data management is performed by LogStore service;
  2. Read: When the downstream consumption lag is low, data is mainly read from the LogStore service through socket. When the lag is large and these data has been persisted from the LogStore service to DFS, the downstream will read directly from DFS until the lag is small enough.
    Image RemovedImage Added

Checkpoint

Every time the writer node triggers a checkpoint, the LogStore service synchronously uploads local incremental data to DFS for persistence, and returns the addresses of these files. The GlobalCommitter node then writes these addresses into the manifest corresponding to the log.

In order to record these manifests, we need to modify the version of the snapshot and add the LogStoreManifestList field to the snapshot to record the LogStore data generated in this snapshot.

Image RemovedImage Added

Data Cleaning

Snapshot expires

...

After the upstream job fails and restarts, it will continue to write to the offset of the recently committed snapshot. If the LogStore service detects repeated offset writing, it will directly discard the original data after the offset. And notify the downstream job that the offset needs to be reset. The downstream job needs to reset the offset to the end position of the snapshot closest to the offset and re-consume it. The data read before will be considered as repeated reading caused by dirty reading.Image Removed

Image Added

Read side failure

...