Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

It does not support compact topics with tiered storage. Topic created with the effective value for remote.log.enabled as storage.enable as true, can not change its retention policy from delete to compact.

...

Broker A (Leader)

Broker B (Follower)

Remote Storage

RL metadata storage

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-2

6: msg 6 LE-2

7: msg 7 LE-3

8: msg 8 LE-3

9: msg 9 LE-3 (HW)




leader_epochs

LE-0, 0

LE-1, 3

LE-2, 5

LE-3, 7

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-1

leader_epochs

LE-0, 0

LE-1, 3

1. Because the latest leader epoch in the local storage (LE-1) does not equal to the current leader epoch (LE-3). The follower starts from the Truncating state.

2. fetchLeaderEpochEndOffsets(LE-1) returns 5, which is larger than the latest local offset.  With the existing truncation logic, the local log is not truncated and it moves to Fetching state.




seg-0-2, uuid-1

  log:

  0: msg 0 LE-0

  1: msg 1 LE-0

  2: msg 2 LE-0

  epochs:

  LE-0, 0


seg 3-5, uuid-2

  log:

  3: msg 3 LE-1

  4: msg 4 LE-1

  5: msg 5 LE-2

  epochs:

  LE-0, 0

  LE-1, 3

  LE-2, 5

seg-0-2, uuid-1

segment epochs

LE-0, 0


seg-3-5, uuid-2

segment epochs

LE-1, 3

LE-2, 5

...

Metadata of remote log segments are stored in an internal topic called `__remote_log_segment_metadata`. This topic can be created with default partitions count as 50. Users can configure the partitions count and replication factor etc as mentioned in the config section.

...

Code Block
title_rlmm_committed_offsets
collapsetrue
0 2022
4 104
2 498

Internal flat-file store format

Message Formatter for the internal topic

...

  1. The controller publishes delete_partition_marked event to say that the partition is marked for deletion. There can be multiple events published when the controller restarts or failover and this event will be deduplicated by RPM. 
  2. RPM receives the delete_partition_marked and processes it if it is not yet processed earlier.
  3. RPM publishes an event delete_partition_started that indicates the partition deletion has already been started. 
  4. RPMgets all the remote log segments for the partition and each of these remote log segments are deleted with the next steps.
  5. Publish delete_segment_started event with the segment id. 
  6. RPM deletes the segment using RSM 
  7. Publish delete_segment_finished event with segment id once it is successful. 
  8. Publish delete_partition_finished once all the segments have been deleted successfully.

Recovering  from remote log metadata topic partitions truncation

// todo - update the doc on possible recovery mechanisms.

Protocol Changes

ListOffsets

...

Compacted topics will not have remote storage support. 

Configs

System-Wide

remote.logstorage.storagesystem.enable Whether to enable remote log storage tier storage functionality in a broker or not. Valid values are `true` or `false` and the default value is false. This property gives backward compatibility. When it is true broker starts all the services required for tiered storage. 

remote.log.storage.manager.class.name - This is mandatory if the remote.logstorage.storagesystem.enable is  is set as true.

remote.log.metadata.manager.class.name(optional) - This is an optional property. If this is not configured, Kafka uses an inbuilt metadata manager backed by an internal topic.

RemoteStorageManager

(These configs are dependent on remote storage manager implementation)

remote.log.storage.*

RemoteLogMetadataManager

(These configs are dependent on remote log metadata manager implementation)

remote.log.metadata.*

Remote log manager related configuration.

remote.log.index.file.cache.total.size.mb
The total size of the space allocated to store index files fetched from remote storage in the local storage.
Default value: 1024

remote.log.manager.thread.pool.size
Remote log thread pool size, which is used in scheduling tasks to copy segments, and clean up remote log segments.
Default value: 10

remote.log.manager.task.interval.ms
The interval at which the remote log manager runs the scheduled tasks like copy segments, and clean up remote log segments.
Default value: 30,000

remote.log.manager.task.retry.backoff.ms
The amount of time in milliseconds to wait before attempting to retry a failed remote storage request.
Default value: 500

remote.log.manager.task.retry.backoff.max.ms
The maximum amount of time in milliseconds to wait before attempting to retry a failed remote storage request.
Default value: 30,000

remote.log.manager.task.retry.jitter
Random jitter amount applied to the `remote.log.manager.task.retry.backoff.ms` for computing the resultant backoff interval. This will avoid reconnection storms.
Default value: 0.2

remote.log.reader.threads
Remote log reader thread pool size, which is used in scheduling tasks to fetch data from remote storage.  
Default value: 5

remote.log.reader.max.pending.tasks
Maximum remote log reader thread pool task queue size. If the task queue is full, broker will stop reading remote log segments.
Default value: 100

Per Topic Configuration

Users can set the desired config for remote.log.storage.enable property for a topic, the default value is false. To enable tier storage for a topic, set remote.log.storage.enable as true.

Below retention configs are similar to the log retention. This configuration is used to determine how long the log segments are to be retained in the local storage. Existing log.retention.* are retention configs for the topic partition which includes both local and remote storage. 

local.log.retention.ms
The number of milli seconds to keep the local log segment before it gets deleted. If not set, the value in `log.retention.ms` is used. The effective value should always be less than or equal to log.retention.bytes value.

local.log.retention.bytes
The maximum size of local log segments that can grow for a partition before it deletes the old segments. If not set, the value in `log.retention.bytes` is used. The effective value should always be less than or equal to log.retention.bytes value.

...

mbeandescription
kafka.server:type=BrokerTopicMetrics, name=RemoteReadRequestsPerSec, topic=([-.w]+)Number of remote storage read requests per second.
kafka.server:type=BrokerTopicMetrics, name=RemoteBytesInPerSec, topic=([-.w]+)Number of bytes read from remote storage per second.
kafka.server:type=BrokerTopicMetrics, name=RemoteReadErrorPerSec, topic=([-.w]+)Number of remote storage read errors per second.
kafka.log.remote:type=RemoteStorageThreadPool, name=RemoteLogReaderTaskQueueSizeNumber of remote storage read tasks pending for execution.
kafka.log.remote:type=RemoteStorageThreadPool, name=RemoteLogReaderAvgIdlePercentAverage idle percent of the remote storage reader thread pool.
kafka.log.remote:type=RemoteLogManager, name=RemoteLogManagerTasksAvgIdlePercentAverage idle percent of RemoteLogManager thread pool.

kafka.server:type=BrokerTopicMetrics, name=RemoteBytesOutPerSec, topic=([-.w]+)

Number of bytes copied to remote storage per second.
kafka.server:type=BrokerTopicMetrics, name=RemoteWriteErrorPerSec, topic=([-.w]+)Number of remote storage write errors per second.


Upgrade

//todo - add upgrade scenarios

Recovery mechanism incase of the broker or cluster failure

//todo - add possible solutions (maybe best effort scenarios)

Feature Test

Feature test cases and test results are documented in this google spreadsheet.

Performance Test Results

Upgrade process to enable tiered storage option will be based on the existing Kafka version.

Upgrade from prior to 2.7

If you are upgrading from prior to 2.7 version then please upgrade it to 2.7 and follow the below steps.

Upgrade the existing Kafka cluster to 2.7 version and allow this to run for the log retention of user topics that you want to enable tiered storage. This will allow all the topics to have the producer snapshots generated for each log segment. This is mandatory for enabling tired storage for topics that were produced with idempotent/transactional producers. You can upgrade to the released version containing tiered storage as mentioned below. 

Upgrade from 2.7

Follow the steps 1 and 2 mentioned in Kafka upgrade to reach the state where all brokers are running with the latest binaries but with the earlier inter.broker.protocol and log.message.format versions.
After that, a rolling restart should be done by enabling remote.log.storage.system.enable as true on brokers so that they can have remote log subsystems up and running.

Users can enable tier storage by setting “remote.log.storage.enable” to true on the desired topics.

If the topic-id is not received in the LeaderAndIsr request then remote log storage will not start. But it will log an error message in the log. One way to address this is to do a rolling restart of that broker, so that the leader will be moved to another broker and controller will send LeaderAndIsr with the registered topic-id.

Feature Test

Feature test cases and test results are documented in this google spreadsheet.

Performance Test Results

We We have tested the performance of the initial implementation of this proposal.

...

  • Enhance RLMM local file-based cache with RocksDB to avoid loading the whole cache inmemory.   
  • Enhance RLMM implementation based on topic based storage pointing to a target Kafka cluster instead of using as system level topic within the cluster.  
  • Improve default RLMM implementation with a less chatty protocol.
  • Recovery mechanism incase of the broker or cluster failure.
    • This is to be done by fetching the remote log metadata from RemoteStorageManager.
  • Recovering  from remote log metadata topic partitions truncation

Alternatives considered

Following alternatives were considered:

...