Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion thread
Vote thread
ISSUE
Release

Motivation

Currently paimon has two storage tiers: fileStore and LogStorelogStore. If the business is based on exactly-once semantics or is not sensitive to latency, LogStore does logStore does not need to be enabled. 

However, in scenarios where the business accepts at-least-once semantics and requires low latency, LogStore will logStore will be enabled to accelerate data visibility, such as real-time indicator calculation scenarios. 

In the current implementation of paimon, only kafka-based implementation is provided, and only Flink flink is supported for use. This brings the following limitations:

  1. Increased maintenance and management costs: additional maintenance of the kafka service is required. At the same time, because paimon writes and consumes according to bucket granularity, the number of Kafka kafka patitions needs to be changed simultaneously when modifying the paimon bucket; Kafka  In addition, kafka also needs to be involved when troubleshooting;.
  2. Inconsistent data management: paimon manages data at the granularity of snapshots. The life cycle of the changelog is consistent with that of snapshots. However, Kafkakafka's data expiration strategy cannot be consistent with Paimon;At paimon;At the same time, paimon cannot delete LogStore the data of logStore synchronously during rollback, which will cause consume old data in LogStorelogStore;
  3. Dynamic scaling is difficult to achieve: paimon has implemented automatic scaling of buckets for primary key tables, but it is difficult for kafka to automatically adapt to paimon's buckets.
  4. Storage redundancy: in order to ensure that the data in LogStore logStore is valid, the data expiration time of kafka needs to be longer than the life cycle of Paimon paimon's snapshot.


Therefore, we hope to provide a lightweight LogStore logStore in paimon that does not rely on other services to achieve:

  1. Out-of-the-box LogStore storage,which logStore storage, which does not rely on other services and reduce maintenance and management costs;
  2. The data life cycle is consistent with paimon’s snapshot, reducing redundant storagePaimon's snapshot to reduce redundant data;
  3. Data sharding is aligned with paimon’s partitions and buckets to support the related features of paimon’s data sharding;
  4. Support second-level data visibility under at-least-once semantics.

...

  1. EMBEDDED: The corresponding LogStore service is automatically launched when the writer node is started. Each LogStore service is responsible for reading and writing all buckets on the writer node;
  2. EXTERNAL:Deploy LogStore service externally, for example, use Flink a flink Job for deployment. When each writer node writes, it is routed to a specific LogStore service according to the bucket.

...

  1. Write: LogWriter only needs to continuously send write requests to LogStore service, and data management is performed by LogStore service;
  2. Read: When the downstream consumption lag is low, data is mainly read from the LogStore service through socket. When the lag is large and these data has been persisted from the LogStore service to DFSdfs, the downstream will read directly from DFS dfs file until the lag is small enough.

...

Every time the writer node triggers a checkpoint, the LogStore service synchronously uploads local incremental data to DFS dfs for persistence, and returns the addresses of these files. The GlobalCommitter node then writes these addresses into the manifest corresponding to the log.

In order to record these manifests, we need to modify the version of the snapshot and add the LogStoreManifestList field to the snapshot to record the LogStore data of log store  generated in this snapshot.

...

Snapshot expires

The same way FileStore fileStore cleans up files, read LogStoreManifestList and then delete logFile in order.

...

Since the LogStore service has persisted the local data to DFS dfs after the checkpoint is completed, this part of the incremental data can be cleaned locally. However, considering that downstream reading does not have zero latency, the LogStore service can locally retain the incremental data of 2 checkpoints by default. If the amount of data is large, it can be reduced to retaining only the incremental data after the latest checkpoint.

...

In most cases, the downstream job always consumes the latest offset written by the upstream, so the data in the memory of the writing end LogStore service can cover the read requests in most scenarios. However, due to faults, memory data being eliminated, etc., the downstream job may not be able to find the data in the memory of the LogStore service. Fault tolerance needs to be implemented for this situation.

...