Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When a partition is deleted, the controller updates its state in RLMM with DELETE_PARTITION_MARKED and it expects RLMM will have a mechanism to cleanup clean up the remote log segments. This process for default RLMM is described in detail here

...

RLMM maintains metadata cache by subscribing to the respective remote log metadata topic partitions. Whenever a topic partition is reassigned to a new broker and RLMM on that broker is not subscribed to the respective remote log metadata topic partition then it will subscribe to the respective remote log metadata topic partition and adds all the entries to the cache. So, in the worst case, RLMM on a broker may be consuming from most of the remote log metadata topic partitions. In the initial version, we will have a file-based cache for all the messages that are already consumed by this instance and it will load inmemory in-memory whenever RLMM is started. This cache is maintained in a separate file for each of the topic partitions. This will allow us to commit offsets of the partitions that are already read. Committed offsets can be stored in a local file to avoid reading the messages again when a broker is restarted. 

...

remote.log.metadata.topic.replication.factor


Replication factor of the topic

Default: 3

remote.log.metadata.topic.num.partitions

No of partitions of the topic

Default: 50

remote.log.metadata.topic.retention.ms

Retention of the topic in milli seconds. 

Default: -1, that means unlimited. 

Users can configure this value based on their usecases. To avoid any data loss, this value should be more than the maximum retention period of any topic enaled enabled with tiered storage in the cluster. 

remote.log.metadata.manager.listener.name

Listener name to be  be used to connect to the local broker by RemoteLogMetadataManager implementation on the broker. Respective endpoint address is passed with  "bootstrap.servers" property while invoking RemoteLogMetadataManager#configure(Map<String, ?> props). 

This is used by kafka clients created in RemoteLogMetadataManager implementation.

remote.log.metadata.*

Default RLMM implementation creates producer and consumer instances. Common client propoerties can be configured with `remote.log.metadata.common.client.` prefix.  User can also pass properties specific to producer/consumer with `remote.log.metadata.producer.` and `remote.log.metadata.consumer.` prefixes. These will override properties with `remote.log.metadata.common.client.` prefix.

Any other properties should be prefixed with "remote.log.metadata." and these will be passed to RemoteLogMetadataManager#configure(Map<String, ?> props).

For ex: Security configuration to connect to the local broker for the listener name configured are passed with props.

remote.partition.remover.task.interval.msThe interval at which remote partition remover runs to delete the remote storage of the partitions marked for deletion.
Default value: 3600000 (1 hr )

...

`org.apache.kafka.server.log.remote.storage.RemoteLogMetadataFormatter` can be used to format messages received from remote log metadata topic by console consumer. User Users can pass properties mentioned in the below block with '–property' while running console consumer with this message formatter. The below block explains the format and it may change later. This formatter can be helpful for debugging purposes.

...

The controller receives a delete request for a topic. It goes through the existing protocol of deletion and it makes all the replicas offline to stop taking any fetch requests.  After all the replicas reach the offline state, the controller publishes an event to the RemoteLogMetadataManager(RLMM) by marking the topic as deleted using RemoteLogMetadataManager.updateRemotePartitionDeleteMetadata with the state as RemotePartitionDeleteState#DELETE_PARTITION_MARKED.  With KIP-516, topics are represented with uuid, and topics can be deleted asynchronously. This allows the remote logs can be garbage collected later by publishing the deletion marker into the remote log metadata topic. RLMM is responsible for asynchronously deleting all the remote log segments of a partition after receiving RemotePartitionDeleteState as DELETE_PARTITION_MARKED. 

...

Currently, it supports the listing of offsets based on the earliest timestamp and the latest timestamp of the complete log. There is no change in the protocol but the new versions will start supporting listing earliest offsets based on the local logs but not only on the complete log including remote log. This protocol will be updated with the changes from KIP-516 but there are no changes required as mentioned earlier. Request and response versions will be bumped to version 7.

Fetch

We are bumpingup bumping up fetch protocol to handle new error codes, there are no changes in request and response schemas. When a follower tries to fetch records for an offset that does not exist locally then it returns a new error `OFFSET_MOVED_TO_TIERED_STORAGE`. This is explained in detail here

...

`RemoteStorageManager` is an interface to provide the lifecycle of remote log segments and indexes. More details about how we arrived at this interface are discussed in the document. We will provide a simple implementation of RSM to get a better understanding of the APIs. HDFS and S3 implementation are planned to be hosted in external repos and these will not be part of Apache Kafka repo. This is inline in line with the approach taken for Kafka connectors.

...

Consuming old data has a significant performance impact to acks=all producers. Without tiered storage, the P99 produce latency is almost tripled. With tiered storage, the performance impact is relatively lower, because remote storage reading does not compete with the local hard disk bandwidth with produce requests.

...

Without tiered storage, the rebuilding broker has to read a large amount of data from the local hard disks of the leaders. This competes for page cache and local disk bandwidth with the normal traffic , and dramatically increases the acks=all produce latency.

...

  • Enhance RLMM local file-based cache with RocksDB to avoid loading the whole cache inmemory. 
  • Enhance RLMM implementation based on topic based storage pointing to a target Kafka cluster instead of using a system level topic within the cluster.
  • Improve default RLMM implementation with a less chatty protocol.
  • Support disabling tiered storage for a topic. 
  • Add a system level config to enable tiered storage for all the topics in a cluster.
  • Recovery mechanism incase in case of the broker or cluster failure.
    • This is to be done by fetching the remote log metadata from RemoteStorageManager.
  • Recovering  from remote log metadata topic partitions truncation
  • Extract RPMM as a separate task and allow any RLMM implementation to reuse the task for deletion of remote segments and complete the remote partition deletion.

...

  1. Replace all local storage with remote storage - Instead of using local storage on Kafka brokers, only remote storage is used for storing log segments and offset index files. While this has the benefits related to reducing the local storage, it has the problem of not leveraging the OS page cache and local disk for efficient latest reads as done in Kafka today. 
  2. Implement Kafka API on another store - This is an approach that is taken by some vendors where Kafka API is implemented on a different distributed, scalable storage (example HDFS). Such an option does not leverage Kafka other than API compliance and requires the much riskier option of replacing the entire Kafka cluster with another system.
  3. Client directly reads remote log segments from the remote storage - The log segments on the remote storage can be directly read by the client instead of serving it from Kafka broker. This reduces Kafka broker changes and has the benefits of removing an extra hop. However, this bypasses Kafka security completely, increases Kafka client library complexity and footprint, causes compatibility issues to the existing Kafka client libraries, and hence is not considered. 
  4. Store all remote segment metadata in remote storage. This approach works with the storage systems that provide strong consistent metadata, such as HDFS, but does not work with S3 and GCS. Frequently calling LIST API on S3 or GCS also incurs huge costs. So, we choose to store metadata in a Kafka topic in the default implementation but allow users to use other methods with their own RLMM implementations.
  5. Cache all remote log indexes in local storage. Store remote log segment information in local storage. 

...