You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Discussion thread
Vote thread
ISSUE
Release

Motivation

Currently paimon has two storage tiers: fileStore and LogStore. If the business is based on exactly-once semantics or is not sensitive to latency, LogStore does not need to be enabled. 

However, in scenarios where the business accepts at-least-once semantics and requires low latency, LogStore will be enabled to accelerate data visibility, such as real-time indicator calculation scenarios. 

In the current implementation of paimon, only kafka-based implementation is provided, and only Flink is supported for use. This brings the following limitations:

  1. Increased maintenance and management costs: additional maintenance of the kafka service is required. At the same time, because paimon writes and consumes according to bucket granularity, the number of Kafka patitions needs to be changed simultaneously when modifying the paimon bucket; Kafka also needs to be involved when troubleshooting;
  2. Inconsistent data management: paimon manages data at the granularity of snapshots. The life cycle of the changelog is consistent with that of snapshots. However, Kafka's data expiration strategy cannot be consistent with Paimon;At the same time, paimon cannot delete LogStore data synchronously during rollback, which will cause consume old data in LogStore;
  3. Dynamic scaling is difficult to achieve: paimon has implemented automatic scaling of buckets for primary key tables, but it is difficult for kafka to automatically adapt to paimon's buckets.
  4. Storage redundancy: in order to ensure that the data in LogStore is valid, the data expiration time of kafka needs to be longer than the life cycle of Paimon snapshot.


Therefore, we hope to provide a lightweight LogStore in paimon that does not rely on other services to achieve:

  1. Out-of-the-box LogStore storage,which does not rely on other services and reduce maintenance and management costs;
  2. The data life cycle is consistent with paimon’s snapshot, reducing redundant storage;
  3. Data sharding is aligned with paimon’s partitions and buckets to support the related features of paimon’s data sharding;
  4. Support second-level data visibility under at-least-once semantics.

Public Interfaces

  1. Add the createLogStoreProvider interface to the existing LogStoreTableFactory.

    republic interface LogStoreTableFactory<LogStore> extends Factory { 
    	LogStoreProvider createLogStoreProvider(Context context); 
    }
  2. Add a new LogStoreProvider interface for creating LogStore objects;

    public interface LogStoreProvider<LogStore> extends Serializable { 
    	LogStore createLogStore(Context context); 
    }
  3. Add a new LogStore interface and define the basic interface of LogStore.

    public inteface LogStore<CHKT> extends Serializable {
    
        void start();
    
        CHKT checkpoint();
    
        void shutdown();
    
        DeployMode deployMode(Context context);
    }
    
    enum DeployMode {
        EMBEDDED,
        EXTERNAL
    }

Proposed Changes

Overview

  1. LogWriter: Created by LogSinkProvider and used to send data to LogStore service;
  2. LogReader: Created by LogSourceProvider and used to read data from LogStore service;
  3. LogStore service: Publish the service address after startup. Then receive read / write / snapshot and other requests from reader / writer to manage LogStore data.

Note: In the first version, only single-writer-multi-reader will be supported.

Deployment Mode

According to different requirements, such as the number of readers, high availability, etc., two deployment modes are supported:

  1. EMBEDDED: The corresponding LogStore service is automatically launched when the writer node is started. Each LogStore service is responsible for reading and writing all buckets on the writer node;
  1. EXTERNAL:Deploy LogStore service externally, for example, use Flink Job for deployment. When each writer node writes, it is routed to a specific LogStore service according to the bucket.

Write & Read

  1. Write: LogWriter only needs to continuously send write requests to LogStore service, and data management is performed by LogStore service;
  2. Read: When the downstream consumption lag is low, data is mainly read from the LogStore service through socket. When the lag is large and these data has been persisted from the LogStore service to DFS, the downstream will read directly from DFS until the lag is small enough.

Checkpoint

Every time the writer node triggers a checkpoint, the LogStore service synchronously uploads local incremental data to DFS for persistence, and returns the addresses of these files. The GlobalCommitter node then writes these addresses into the manifest corresponding to the log.

In order to record these manifests, we need to modify the version of the snapshot and add the LogStoreManifestList field to the snapshot to record the LogStore data generated in this snapshot.

Data Cleaning

Snapshot expires

The same way FileStore cleans up files, read LogStoreManifestList and then delete logFile in order.

Local data cleaning of LogStore service

Since the LogStore service has persisted the local data to DFS after the checkpoint is completed, this part of the incremental data can be cleaned locally. However, considering that downstream reading does not have zero latency, the LogStore service can locally retain the incremental data of 2 checkpoints by default. If the amount of data is large, it can be reduced to retaining only the incremental data after the latest checkpoint.

Fault Tolerance

In most cases, the downstream job always consumes the latest offset written by the upstream, so the data in the memory of the writing end can cover the read requests in most scenarios. However, due to faults, memory data being eliminated, etc., the downstream job may not be able to find the data in the memory of the LogStore service. Fault tolerance needs to be implemented for this situation.

Write side failure

After the upstream job fails and restarts, it will continue to write to the offset of the recently committed snapshot. If the LogStore service detects repeated offset writing, it will directly discard the original data after the offset. And notify the downstream job that the offset needs to be reset. The downstream job needs to reset the offset to the end position of the snapshot closest to the offset and re-consume it. The data read before will be considered as repeated reading caused by dirty reading.

Read side failure

After the downstream job fails and restarts, it only needs to reconnect to the LogStore service, and depending on whether the offset is still in the LogStore service, it decides to use socketReader or fileReader to read data. Refer to the [Writing & Reading] section.

LogStore service failure

After the upstream job detects a LogStore service failure or is notified by the LogStore service that a restart has occurred, the upstream job also needs to failover to ensure that data is not lost. The subsequent behavior is consistent with the behavior of the [Write side failure] section.

Compatibility, Deprecation, and Migration Plan

  1. This will provide a new LogStore type and will have no impact on existing users;
  2. For the existing KafkaLogStoreFactory, we need to provide a LogStoreProvider that does not implement any LogStore interface and has no impact on existing behavior;
  3. Relevant changes to the Snapshot structure can be identified based on the version field and are expected to have no impact.

Test Plan

Similar to the current test provided for KafkaLogStoreFactory, in theory the configured LogStore type can be modified to this new LogStore to reuse the related UT & IT, but some changes need to be made in the implementation, such as currently relying on docker images to start Kafka service.

  • No labels