Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Authors Satish Duggana, Sriharsha Chintalapani, Satish DugganaYing Zheng, Suresh Srinivas, Ying Zheng (alphabetical order by the last names)

Table of Contents

Status

Current State: Discussion "Accepted"

Discussion Thread: Discuss Thread here

JIRA

toc
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7739


Motivation

Kafka is an important part of data infrastructure and is seeing significant adoption and growth. As the Kafka cluster size grows and more data is stored in Kafka for a longer duration, several issues related to scalability, efficiency, and operations become important to address.

...

In the tiered storage approach, Kafka cluster is configured with two tiers of storage - local and remote. The local tier is the same as the current Kafka that uses the local disks on the Kafka brokers to store the log segments. The new remote tier uses systems, such as HDFS or S3 to store the completed log segments. Two separate retention periods are defined corresponding to each of the tiers. With remote tier enabled, the retention period for the local tier can be significantly reduced from days to few hours. The retention period for remote tier can be much longer, days, or even months. When a log segment is rolled on the local tier, it is copied to the remote tier along with the corresponding offset indexindexes. Latency sensitive applications perform tail reads and are served from local tier leveraging the existing Kafka mechanism of efficiently using page cache to serve the data. Backfill and other applications recovering from a failure that needs data older than what is in the local tier are served from the remote tier.

...

It does not support compact topics with tiered storage. Topic created with the effective value for remote.logstorage.enabled as enable as true, can not change its retention policy from delete to compact.

...

  • receives callback events for leadership changes and stop/delete events of topic partitions on a broker.
  • delegates copy, read, and delete of topic partition segments to a pluggable storage manager(viz RemoteStorageManager) implementation and maintains respective remote log segment metadata through RemoteLogMetadataManager.

`RemoteLogManager` is an internal component and it is not a public API. 

`RemoteStorageManager` is an interface to provide the lifecycle of remote log segments and indexes. More details about how we arrived at this interface are discussed in the document. We will provide a simple implementation of RSM to get a better understanding of the APIs. HDFS and S3 implementation are planned to be hosted in external repos and these will not be part of Apache Kafka repo. This is inline with the approach taken for Kafka connectors.

...

RLM creates tasks for each leader or follower topic partition, which are explained in detail here.

  • RLM Leader Task
    • It checks for rolled over LogSegments (which have the last message offset less than last stable offset of that topic partition) and copies them along with their offset/time/transaction/producer-snapshot indexes and leader epoch cache to the remote tier. It also serves the fetch requests for older data from the remote tier. Local logs are not cleaned up till those segments are copied successfully to remote even though their retention time/size is reached.

...

1) Retrieve the Earliest Local Offset (ELO) and the corresponding leader epoch (ELO-LE) from the leader with a ListOffset request (timestamp = -34)

2) Truncate local log and local auxiliary state

...

Let us discuss a few cases that followers can encounter while it tries to replicate from the leader and build the auxiliary state from remote storage.

OMRS OMTS : OffsetMovedToRemoteStorageOffsetMovedToTieredStorage

ELO : Earliest-Local-Offset

...

Broker A (Leader)

Broker B (Follower)

Remote Storage

RL metadata storage

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-2

6: msg 6 LE-2

7: msg 7 LE-3 (HW)


leader_epochs

LE-0, 0

LE-1, 3

LE-2, 5

LE-3, 7



1. Fetch LE-1, 0

2. Receives OMRSOMTS

3. Receives ELO 3, LE-1

4. Fetch remote segment info and build local leader epoch sequence until ELO


leader_epochs

LE-0, 0

LE-1, 3


seg-0-2, uuid-1

  log:

  0: msg 0 LE-0

  1: msg 1 LE-0

  2: msg 2 LE-0

  epochs:

  LE-0, 0


seg 3-5, uuid-2

  log:

  3: msg 3 LE-1

  4: msg 4 LE-1

  5: msg 5 LE-2

  epochs:

  LE-0, 0

  LE-1, 3

  LE-2, 5

seg-0-2, uuid-1

segment epochs

LE-0, 0


seg-3-5, uuid-2

segment epochs

LE-1, 3

LE-2, 5

...

Broker A (Leader)

Broker B (Follower)

Remote Storage

RL metadata storage

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-2

6: msg 6 LE-2

7: msg 7 LE-3

8: msg 8 LE-3

9: msg 9 LE-3 (HW)




leader_epochs

LE-0, 0

LE-1, 3

LE-2, 5

LE-3, 7

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-1

leader_epochs

LE-0, 0

LE-1, 3

1. Because the latest leader epoch in the local storage (LE-1) does not equal to the current leader epoch (LE-3). The follower starts from the Truncating state.

2. fetchLeaderEpochEndOffsets(LE-1) returns 5, which is larger than the latest local offset.  With the existing truncation logic, the local log is not truncated and it moves to Fetching state.




seg-0-2, uuid-1

  log:

  0: msg 0 LE-0

  1: msg 1 LE-0

  2: msg 2 LE-0

  epochs:

  LE-0, 0


seg 3-5, uuid-2

  log:

  3: msg 3 LE-1

  4: msg 4 LE-1

  5: msg 5 LE-2

  epochs:

  LE-0, 0

  LE-1, 3

  LE-2, 5

seg-0-2, uuid-1

segment epochs

LE-0, 0


seg-3-5, uuid-2

segment epochs

LE-1, 3

LE-2, 5

...

Broker A (Leader)

Broker B (Follower)

Remote Storage

RL metadata storage

9: msg 9 LE-3

10: msg 10 LE-3

11: msg 11 LE-3 (HW)



[segments till offset 8 were deleted]



leader_epochs

LE-0, 0

LE-1, 3

LE-2, 5

LE-3, 7

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-1

leader_epochs

LE-0, 0

LE-1, 3


<Fetch State>

1. Fetch from leader LE-1, 4

2. Receives OMRSOMTS, truncate local segments. 

3. Fetch ELO, Receives ELO 9, LE-3 and moves to BuildingRemoteLogAux state




seg-0-2, uuid-1

  log:

  0: msg 0 LE-0

  1: msg 1 LE-0

  2: msg 2 LE-0

  epochs:

  LE-0, 0


seg 3-5, uuid-2

  log:

  3: msg 3 LE-1

  4: msg 4 LE-1

  5: msg 5 LE-2

  epochs:

  LE-0, 0

  LE-1, 3

  LE-2, 5


Seg 6-8, uuid-3, LE-3

  log:

  6: msg 6 LE-2

  7: msg 7 LE-3

  8: msg 8 LE-3

  epochs:

  LE-0, 0

  LE-1, 3

  LE-2, 5

  LE-3, 7

seg-0-2, uuid-1

segment epochs

LE-0, 0


seg-3-5, uuid-2

segment epochs

LE-1, 3

LE-2, 5


seg-6-8, uuid-3

segment epochs

LE-2, 5

LE-3, 7

...

Broker A (Leader)

Broker B

Remote Storage

RL metadata storage

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0 (HW)

leader_epochs

LE-0, 0

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0 (HW)

leader_epochs

LE-0, 0

seg-0-1:

  log:

  0: msg 0 LE-0

  1: msg 1 LE-0

  epoch:

  LE-0, 0

seg-0-1, uuid-1

segment epochs

LE-0, 0

...

In this case, it is acceptable to lose data, but we have to keep the same behaviour as described in the KIP-101.

Broker A (stopped)

Broker B (Leader)

Remote Storage

RL metadata storage

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0 (HW)

leader_epochs

LE-0, 0

0: msg 0 LE-0 (HW)

1: msg 3 LE-1

leader_epochs

LE-0, 0

LE-1, 1

seg-0-1:

  log:

  0: msg 0 LE-0

  1: msg 1 LE-0

  epoch:

  LE-0, 0

seg-0-1, uuid-1

segment epochs

LE-0, 0

After restart, B losses message 1 and 2. B becomes the new leader, and receives a new message 3 (LE1LE-1, offset 1).

(Note: This may not be technically an unclean-leader-election, because B may have not been removed from ISR because both of the 2 brokers crashed at the same time.)

...

Broker A (follower)

Broker B (Leader)

Remote Storage

RL metadata storage

0: msg 0 LE-0

1: msg 3 LE-1

2: msg 4 LE-1 (HW)

leader_epochs

LE-0, 0

LE-1, 1

LE-2, 2

0: msg 0 LE-0

1: msg 3 LE-1

2: msg 4 LE-1 (HW)

leader_epochs

LE-0, 0

LE-1, 1

LE-2, 2

seg-0-1:

  log:

  0: msg 0 LE-0

  1: msg 1 LE-0

  epoch:

  LE-0, 0

seg-1-1

  log:

  1: msg 1 3 LE-1

  epoch:

  LE-0, 0

  LE-1, 1

seg-0-1, uuid-1

segment epochs

LE-0, 0


seg-1-1, uuid-2

segment epochs

LE-1, 1

A new message (message 4) is received. The 2nd segment on broker B (seg-1-1) is shipped to remote storage.

The Consider the local segments upto up to offset 2 are deleted on both brokers.:

A consumer fetches offset 0, LE-0. According to the local leader epoch cache, offset 0 LE-0 is valid. So, the broker returns message 0 from remote segment 0-1.

A pre-KIP-320 consumer fetches offset 1, without leader epoch info. According to the local leader epoch cache, offset 1 belongs to LE-1. So, the broker returns message 3 from remote segment 1-1, rather than the LE-0 offset 1 message ( message 1 ) in seg-0-1.

A consumer fetches offset 2 LE0 LE-0 is fenced (KIP-320).

A consumer fetches offset 1 LE-1 LE1 receives message 3 from remote segment 1-1.

...

Broker A (Leader)

Broker B (stopped)

Remote Storage

RL metadata storage

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-0

4: msg 4 LE-0

5: msg 7 LE-2

6: msg 8 LE-2

leader_epochs

LE-0, 0

LE-2, 5

1. Broker A receives two new messages in LE-2

2. Broker A shipps ships seg-4-5 to remote storage





0: msg 0 LE-0

1: msg 1 LE-0

2: msg 4 LE-1

3: msg 5 LE-1

4: msg 6 LE-1

leader_epochs

LE-0, 0

LE-1, 2





seg-0-3

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-0

epoch:

LE-0, 0

seg-0-3

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 4 LE-1

3: msg 5 LE-1

epoch:

LE-0, 0

LE-1, 2

seg-4-5

epoch:

LE-0, 0

LE-2, 5

seg-0-3, uuid1

segment epochs

LE-0, 0


seg-0-3, uuid2

segment epochs

LE-0, 0

LE-1, 2


seg-4-5, uuid3

segment epochs

LE-0, 0

LE-2, 5

...

Broker A (Leader)

Broker B (started, follower)

Remote Storage

RL metadata storage

6: msg 8 LE-2

leader_epochs

LE-0, 0

LE-2, 5





1. Broker B fetches offset 0, and receives OMRS OMTS error. 

2. Broker B receives ELO=6, LE-2

3. in BuildingRemoteLogAux state, broker B finds seg-4-5 has LE-2. So, it builds local LE cache from seg-4-5:

leader_epochs

LE-0, 0

LE-2, 5

4. Broker B continue fetching from local messages from ELO 6, LE-2

5. Broker B joins ISR

seg-0-3

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-0

epoch:

LE-0, 0

seg-0-3

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 4 LE-1

3: msg 5 LE-1

epoch:

LE-0, 0

LE-1, 2

seg-4-5

epoch:

LE-0, 0

LE-2, 5

seg-0-3, uuid1

segment epochs

LE-0, 0


seg-0-3, uuid2

segment epochs

LE-0, 0

LE-1, 2


seg-4-5, uuid3

segment epochs

LE-0, 0

LE-2, 5

...

For any fetch requests, ReplicaManager will proceed with making a call to readFromLocalLog, if this method returns OffsetOutOfRange exception it will delegate the read call to RemoteLogManager. More details are explained in the RLM/RSM tasks section. If the remote storage is not available then it will throw a new error TIERED_STORAGE_NOT_AVAILABLE.

...

This API is enhanced with supporting new target timestamp value as -3 which 4 which is called EARLIEST_LOCAL_TIMESTAMP. There will not be any new fields added in request and response schemes but there will be a version bump to indicate the version update. This request is about the offset that the followers should start fetching to replicate the local logs. It represents the log-start-offset available in the local log storage which is also called as local-log-start-offset. All the records earlier to this offset can be considered as copied to the remote storage. This is used by follower replicas to avoid fetching records that are already copied to remote tier storage.

...

If the process of a topic-partition is failed due to remote storage error, it follows retry backing off algorithm with intiial retry interval as `remote.log.manager.task.retry.interval.ms`, max backoff as `remote.log.manager.task.retry.backoff.max.backoff.ms`, and jitter as `remote.log.manager.task.retry.jitter`. You can see more details about the exponential backoff algorithm here.

...

When a partition is deleted, the controller updates its state in RLMM with DELETE_PARTITION_MARKED and it expects RLMM will have a mechanism to cleanup clean up the remote log segments. This process for default RLMM is described in detail here

RemoteLogMetadataManager implemented with an internal topic

Metadata of remote log segments are stored in an internal non compact topic called `__remote_log_segment_metadata`. This topic can be created with default partitions count as 50. Users can configure the partitions count and replication factor etc as mentioned in the config section.

...

RLMM maintains metadata cache by subscribing to the respective remote log metadata topic partitions. Whenever a topic partition is reassigned to a new broker and RLMM on that broker is not subscribed to the respective remote log metadata topic partition then it will subscribe to the respective remote log metadata topic partition and adds all the entries to the cache. So, in the worst case, RLMM on a broker may be consuming from most of the remote log metadata topic partitions. In the initial version, we will have a file-based cache for all the messages that are already consumed by this instance and it will load inmemory in-memory whenever RLMM is started. This cache is maintained in a separate file for each of the topic partitions. This will allow us to commit offsets of the partitions that are already read. Committed offsets can be stored in a local file to avoid reading the messages again when a broker is restarted. We can improve this by having a RocksDB based cache to avoid a high memory footprint on a broker.

Message Format

RLMM instance on broker publishes the message to the topic with key as null and value with the below format.

type      : unsigned var int, represents the value type. This value is 'apikey' as mentioned in the schema. 
version : unsigned var int, the 'version' number of the type as mentioned in the schema. 
data      : record payload in kafka protocol message format, the schema is given below.

Both type and version are added before the data is serialized into record value.  Schema can be evolved by adding a new version with the respective changes. A new type can also be supported by adding the respective type and its version.  

 

RLMM segment overhead:

Topic partition's topic-id : uuid : 2 longs.

remoteLogSegmentId : uuid : 2 longs.

remoteLogSegmentMetadata : 5 longs + 1 int +1 byte + ~3 epochs(approx avg)

It has leader epochs in-memory which will be much less.

On avg: 10 longs : 10 * 8 = 80 *(other overhead 1.25) = 100 bytes

When a segment is rolled on a broker per sec.

retention as 30days : 60*60*24*30 ~ 2.6MM

2.6MM segments would take ~ 260MB. (This is 1% in our production env)

This overhead is not that significant as brokers may be using several GBs of memory. 

We can also have a lazy load approach by keeping only minimal in-memory entries like offset, epoch, uuid, and entry position in the file. When it is needed we can access it by using the entry position in the file. 

Message Format

RLMM instance on broker publishes the message to the topic with key as null and value with the below format. 

type      : Represents the value type. This value is 'apikey' as mentioned in the schema. Its type is 'byte'.
version : the 'version' number of the type as mentioned in the schema. Its type is 'byte'.
data      : record payload in kafka protocol message format, the schema is given below.

Both type and version are added before the data is serialized into record value.  Schema can be evolved by adding a new version with the respective changes. A new type can also be supported by adding the respective type and its version.  

Code Block
titleSchema
{
  "apiKey": 0,
  "type": "data",
  "name": "RemoteLogSegmentMetadataRecord",
  "validVersions": "0",
  "
Code Block
titleSchema
{
  "apiKey": 0,
  "type": "data",
  "name": "RemoteLogSegmentMetadataRecord",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    {
      "name": "RemoteLogSegmentId",
      "type": "RemoteLogSegmentIdEntry",
      "versions": "0+",
      "about": "Unique idrepresentation of the remote log segment",
      "fields": [
        {
          "name": "topicNameTopicIdPartition",
          "type": "stringTopicIdPartitionEntry",
          "versions": "0+",
          "about": "Topic name"Represents unique topic partition",
        },
   "fields": [
            {
              "name": "topicIdName",
              "type": "uuidstring",
              "versions": "0+",
              "about": "Topic idname"
            },
            {
              "name": "partitionId",
              "type": "int32uuid",
              "versions": "0+",
              "about": "Partition numberUnique identifier of the topic"
            },
            {
              "name": "segmentIdPartition",
              "type": "uuidint32",
              "versions": "0+",
              "about": "UniquePartition identifiernumber"
 of the log segment"
        }
          ]
        },
        {
          "name": "StartOffsetId",
          "type": "int64uuid",
          "versions": "0+",
          "about": "StartUnique offsetidentifier  of the remote log segment."
        }
      ]
    },
    {
      "name": "endOffsetStartOffset",
      "type": "int64",
      "versions": "0+",
      "about": "EndStart offset  of the segment."
    },
    {
      "name": "LeaderEpochEndOffset",
      "type": "int32int64",
      "versions": "0+",
      "about": "Leader End offset  of the segment."
    },
    {
      "name": "LeaderEpoch",
      "type": "int32",
      "versions": "0+",
      "about": "Leader epoch from which this segment instance is created or updated"
    },
    {
      "name": "MaxTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Maximum timestamp with in this segment."
    },
    {
      "name": "EventTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Event timestamp of this segment."
    },
    {
      "name": "SegmentLeaderEpochs",
      "type": "[]SegmentLeaderEpochEntry",
      "versions": "0+",
      "about": "EventLeader timestamp of this segmentepoch cache.",
      "fields": [
        {
          "name": "LeaderEpoch",
          "type": "int32",
          "versions": "0+",
          "about": "Leader epoch"
        },
        {
          "name": "Offset",
          "type": "int64",
          "versions": "0+",
          "about": "Start offset for the leader epoch"
        }
      ]
    },
    {
      "name": "SegmentSizeInBytes",
      "type": "int32",
      "versions": "0+",
      "about": "Segment size in bytes"
    },
    {
      "name": "RemoteLogStateRemoteLogSegmentState",
      "type": "int8",
      "versions": "0+",
      "about": "State of the remote log segment"
    }
  ]
}


{
  "apiKey": 1,
  "type": "data",
  "name": "RemoteLogSegmentMetadataRecordUpdate",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    {
      "name": "RemoteLogSegmentId",
      "type": "RemoteLogSegmentIdEntry",
      "versions": "0+",
      "about": "Unique idrepresentation of the remote log segment",
      "fields": [
        {
          "name": "topicTopicIdPartition",
          "type": "stringTopicIdPartitionEntry",
          "versions": "0+",
          "about": "Topic name"
Represents unique topic partition",
           },
"fields": [
            {
              "name": "topicIdName",
              "type": "uuidstring",
              "versions": "0+",
              "about": "UniqueTopic identifiername"
 of the topic id"
        },
            {
              "name": "partitionId",
              "type": "int32uuid",
          "    "versions": "0+",
              "about": "Unique identifier of the topic"
            },
            {
              "name": "Partition",
              "type": "int32",
              "versions": "0+",
              "about": "Partition number"
            }
          ]
        },
        {
          "name": "idId",
          "type": "uuid",
          "versions": "0+",
          "about": "Unique identifier of the remote log segment"
        }
      ]
    },
    {
      "name": "LeaderEpoch",
      "type": "int32",
      "versions": "0+",
      "about": "Leader epoch from which this segment instance is created or updated"
    },
    {
      "name": "EventTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Event timestamp of this segment."
    },
    {
      "name": "RemoteLogStateRemoteLogSegmentState",
      "type": "int8",
      "versions": "0+",
      "about": "State of the remote segment"
    }
  ]
}



{
  "apiKey": 2,
  "type": "data",
  "name": "DeletePartitionStateRecordRemotePartitionDeleteMetadataRecord",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    {
      "name": "TopicIdPartition",
      "type": "TopicIdPartitionEntry",
      "versions": "0+",
      "about": "TopicRepresents unique topic partition",
      "fields": [
        {
          "name": "nameName",
          "type": "string",
          "versions": "0+",
          "about": "Topic name"
        },
        {
          "name": "topicIdId",
          "type": "uuid",
          "versions": "0+",
          "about": "Unique identifier of the topic id"
        },
        {
          "name": "partitionPartition",
          "type": "int32",
          "versions": "0+",
          "about": "Partition number"
        }
      ]
    },
    {
      "name": "epochEpoch",
      "type": "int32",
      "versions": "0+",
      "about": "Epoch (controller or leader) from which this event is created. DELETE_PARTITION_MARKED is sent by the controller. DELETE_PARTITION_STARTED and DELETE_PARTITION_FINISHED are sent by remote log metadata topic partition leader."
    },
    {
      "name": "EventTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Event timestamp of this segment."
    },
    {
      "name": "RemotePartitionStateRemotePartitionDeleteState",
      "type": "int8",
      "versions": "0+",
      "about": "StateDeletion state of the remote partition"
    }
  ]
}

package org.apache.kafka.server.log.remote.storage;
...
/**
 * It indicates the deletion state of the remote topic partition. This will be based on the action executed on this
 * partition by the remote log service implementation.
 */
public enum RemotePartitionStateRemotePartitionDeleteState {

    /**
     * This is used when a topic/partition is determined to be deleted by controller.
     * This partition is marked for delete by controller. That means, all its remote log segments are eligible for
     * deletion so that remote logpartition cleanersremovers can start deleting them.
     */
    DELETE_PARTITION_MARKED((byte) 0),

    /**
     * This state indicates that the partition deletion is started but not yet finished.
     */
    DELETE_PARTITION_STARTED((byte) 1),

    /**
     * This state indicates that the partition is deleted successfully.
     */
    DELETE_PARTITION_FINISHED((byte) 2);
...
}


package org.apache.kafka.server.log.remote.storage;
...
/**
 * It indicates the state of the remote log segment or partition. This will be based on the action executed on this
 * segment or partition by the remote log service implementation.
 * <p>
 * todo: check whether the state validations to be checked or not, add next possible states for each state.
 */
public enum RemoteLogStateRemoteLogSegmentState {

    /**
     * This state indicates that the segment copying to remote storage is started but not yet finished.
     */
    COPY_SEGMENT_STARTED((byte) 0),

    /**
     * This state indicates that the segment copying to remote storage is finished.
     */
    COPY_SEGMENT_FINISHED((byte) 1),

    /**
     * This state indicates that the segment deletion is started but not yet finished.
     */
    DELETE_SEGMENT_STARTED((byte) 2),

    /**
     * This state indicates that the segment is deleted successfully.
     */
    DELETE_SEGMENT_FINISHED((byte) 3),
...
}

...

remote.log.metadata.topic.replication.factor


Replication factor of the topic

Default: 3

remote.log.metadata.topic.num.partitions

No of partitions of the topic

Default: 50

remote.log.metadata.topic.retention.ms

Retention of the topic in milli seconds

Default: 365 * 24 * 60 * 60 * 1000  (1 yr) -1, that means unlimited. 

Users can configure this value based on their usecases. To avoid any data loss, this value should be more than the maximum retention period of any topic enabled with tiered storage in the cluster. 

remote.log.metadata.manager.listener.name

Listener name to be  be used to connect to the local broker by RemoteLogMetadataManager implementation on the broker. This is a mandatory config while using the default RLMM implementation which is `org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager`. Respective endpoint address is passed with  "bootstrap.servers" property while invoking RemoteLogMetadataManager#configureinvoking RemoteLogMetadataManager#configure(Map<String, ?> props). 

This is used by kafka clients created in RemoteLogMetadataManager implementation.

remote.log.metadata.*

Default RLMM implementation creates producer and consumer instances. Common client properties can be configured with `remote.log.metadata.common.client.` prefix.  User can also pass properties specific to producer/consumer with `remote.log.metadata.producer.` and `remote.log.metadata.consumer.` prefixes. These will override properties with `remote.log.metadata.common.client.` prefix.

Any other properties should be prefixed with the config: "remote.log.metadata.manager.impl." and these prefix", default value is "rlmm.config.". These configs will be passed to RemoteLogMetadataManager#configure(Map<String, ?> props).

For ex: Security configuration to connect to the local broker for the listener name configured are passed with props.example: "rlmm.config.remote.log.metadata.producer.batch.size=100" will set the batch.size  config for the producer inside default RLMM.

remote.partition.remover.task.interval.msThe interval at which remote partition remover runs to delete the remote storage of the partitions marked for deletion.
Default value: 3600000 (1 hr )

Committed offsets file format

...

Code Block
title_rlmm_committed_offsets
collapsetrue
0 2022
4 104
2 498

Message Formatter for the internal topic

//todo

...

The controller receives a delete request for a topic. It goes through the existing protocol of deletion and it makes all the replicas offline to stop taking any fetch requests.  After all the replicas reach the offline state, the controller publishes an event to the remote log metadata topic by marking the topic as deleted.  With KIP-516, topics are represented with uuid, and topics can be deleted asynchronously. This allows the remote logs can be garbage collected later by publishing the deletion marker into the remote log metadata topic.

Image Removed

RemotePartitionRemover instance is created on the leader for each of the remote log segment metadata topic partitions. It consumes messages from that partitions and filters the delete partition events which need to be processed. It also maintains a committed offset for this instance to handle leader failovers to a different replica so that it can start processing the messages where it left off. 

RemotePartitionRemover(RPM) processes the request with the following flow as mentioned in the below diagram. 

  1. The controller publishes delete_partition_marked event to say that the partition is marked for deletion. There can be multiple events published when the controller restarts or failover and this event will be deduplicated by RPM. 
  2. RPM receives the delete_partition_marked and processes it if it is not yet processed earlier.
  3. RPM publishes an event delete_partition_started that indicates the partition deletion has already been started. 
  4. RPMgets all the remote log segments for the partition and each of these remote log segments are deleted with the next steps.
  5. Publish delete_segment_started event with the segment id. 
  6. RPM deletes the segment using RSM 
  7. Publish delete_segment_finished event with segment id once it is successful. 
  8. Publish delete_partition_finished once all the segments have been deleted successfully.

Image Removed

Recovering  from remote log metadata topic partitions truncation

// todo - update the doc on possible recovery mechanisms.

Protocol Changes

ListOffsets

Currently, it supports the listing of offsets based on the earliest timestamp and the latest timestamp of the complete log. There is no change in the protocol but the new versions will start supporting listing earliest offsets based on the local logs but not only on the complete log including remote log. This protocol will be updated with the changes from KIP-516 but there are no changes required as mentioned earlier. Request and response versions will be bumped to version 7.

Public Interfaces

Compacted topics will not have remote storage support. 

Configs

...

remote.log.storage.enable - Whether to enable remote log storage or not. Valid values are `true` or `false` and the default value is false. This property gives backward compatibility.

remote.log.storage.manager.class.name - This is mandatory if the remote.log.storage.enable is set as true.

remote.log.metadata.manager.class.name(optional) - This is an optional property. If this is not configured, Kafka uses an inbuilt metadata manager backed by an internal topic.

...

(These configs are dependent on remote storage manager implementation)

remote.log.storage.*

...

(These configs are dependent on remote log metadata manager implementation)

remote.log.metadata.*

...

remote.log.index.file.cache.total.size.mb
The total size of the space allocated to store index files fetched from remote storage in the local storage.
Default value: 1024

remote.log.manager.thread.pool.size
Remote log thread pool size, which is used in scheduling tasks to copy segments, and clean up remote log segments.
Default value: 10

remote.log.manager.task.interval.ms
The interval at which the remote log manager runs the scheduled tasks like copy segments, and clean up remote log segments.
Default value: 30,000

remote.log.reader.threads
Remote log reader thread pool size, which is used in scheduling tasks to fetch data from remote storage.  
Default value: 5

remote.log.reader.max.pending.tasks
Maximum remote log reader thread pool task queue size. If the task queue is full, broker will stop reading remote log segments.
Default value: 100

...

 User can set the desired config for remote.log.storage.enable property while creating a topic but it is not allowed to be updated after the topic is created. Other remote.log.* properties can be modified. We will support flipping remote.log.storage.enable in the next versions.

Below retention configs are similar to the log retention. This configuration is used to determine how long the log segments are to be retained in the local storage. Existing log.retention.* are retention configs for the topic partition which includes both local and remote storage. 

local.log.retention.ms
The number of milli seconds to keep the local log segment before it gets deleted. If not set, the value in `log.retention.ms` is used. If set to -1, no time limit is applied.

local.log.retention.bytes
The maximum size of local log segments that can grow for a partition before it deletes the old segments. There is no default value, but the above time based retention always applies.

Internal flat-file store format of remote log metadata

RLMM stores the remote log metadata messages and builds materialized instances in a flat-file store for each user topic partition.


Code Block
titleflat_file_format
collapsetrue
<magic><topic-name><topic-id><metadata-topic-offset><sequence-of-serialized-entries>

magic:                  
    unsigned var int, version of this file format.
topic-name:             
    string, topic name.
topic-id:               
    uuid, uuid of topic
metadata-topic-offset:  
    var long, offset of the remote log metadata topic partition upto which this topic partition's remote log metadata is fetched.
serialized-entries:
   sequence of serialized entries defined as below, more types can be added later if needed.

Serialization of entry is done as mentioned below. This is very similar to the message format mentioned earlier for storing into the metadata topic.  

length    : unsigned var int, length of this entry which is sum of sizes of type, version, and data.
type      : unsigned var int, represents the value type. This value is 'apikey' as mentioned in the schema. 
version   : unsigned var int, the 'version' number of the type as mentioned in the schema. 
data      : record payload in kafka protocol message format, the schema is given below.

Both type and version are added before the data is serialized into record value.  Schema can be evolved by adding a new version with the respective changes. A new type can also be supported by adding the respective type and its version.


{
  "apiKey": 0,
  "type": "data",
  "name": "RemoteLogSegmentMetadataRecordStored",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    {
      "name": "SegmentId",
      "type": "uuid",
      "versions": "0+",
      "about": "Unique identifier of the log segment"
    },
    {
      "name": "StartOffset",
      "type": "int64",
      "versions": "0+",
      "about": "Start offset  of the segment."
    },
    {
      "name": "EndOffset",
      "type": "int64",
      "versions": "0+",
      "about": "End offset  of the segment."
    },
    {
      "name": "LeaderEpoch",
      "type": "int32",
      "versions": "0+",
      "about": "Leader epoch from which this segment instance is created or updated"
    },
    {
      "name": "MaxTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Maximum timestamp with in this segment."
    },
    {
      "name": "EventTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Event timestamp of this segment."
    },
    {
      "name": "SegmentLeaderEpochs",
      "type": "[]SegmentLeaderEpochEntry",
      "versions": "0+",
      "about": "Event timestamp of this segment.",
      "fields": [
        {
          "name": "LeaderEpoch",
          "type": "int32",
          "versions": "0+",
          "about": "Leader epoch"
        },
        {
          "name": "Offset",
          "type": "int64",
          "versions": "0+",
          "about": "Start offset for the leader epoch"
        }
      ]
    },
    {
      "name": "SegmentSizeInBytes",
      "type": "int32",
      "versions": "0+",
      "about": "Segment size in bytes"
    },
    {
      "name": "RemoteLogSegmentState",
      "type": "int8",
      "versions": "0+",
      "about": "State of the remote log segment"
    }
  ]
}


{
  "apiKey": 1,
  "type": "data",
  "name": "DeletePartitionStateRecord",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    {
      "name": "Epoch",
      "type": "int32",
      "versions": "0+",
      "about": "Epoch (controller or leader) from which this event is created. DELETE_PARTITION_MARKED is sent by the controller. DELETE_PARTITION_STARTED and DELETE_PARTITION_FINISHED are sent by remote log metadata topic partition leader."
    },
    {
      "name": "EventTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Event timestamp of this segment."
    },
    {
      "name": "RemotePartitionDeleteState",
      "type": "int8",
      "versions": "0+",
      "about": "Deletion state of the remote partition"
    }
  ]
}


Message Formatter for the internal topic

`org.apache.kafka.server.log.remote.storage.RemoteLogMetadataFormatter` can be used to format messages received from remote log metadata topic by console consumer. Users can pass properties mentioned in the below block with '–property' while running console consumer with this message formatter. The below block explains the format and it may change later. This formatter can be helpful for debugging purposes.

Code Block
languagetext
titleInternal message format
collapsetrue
partition:<val><sep>message-offset:<val><sep>type:<RemoteLogSegmentMetadata | RemoteLogSegmentMetadataUpdate | DeletePartitionState><sep>version:<_no_><vs>event-value:<string representation of the event>

val: represents the respective value of the key. 
sep: represents the separator, default value is: ","

partition : Remote log metata topic partition number. This is optional. 
Use print.partition property to print it, default is false

message-offset : Offset of this message in remote log metadata topic. This is optional. 
Use print.message.offset property to print it, default is false

type: Event value type, which can be one of RemoteLogSegmentMetadata, RemoteLogSegmentMetadataUpdate, DeletePartitionState values.

version: Version number of the event value type. This is optional. 
Use print.version property to print it, default is false

Use print.all.event.value.fields to print the string representation of the event which will include all the fields in the data, default property value is false.

Event value can be of any of the types below:

remote-log-segment-id is represented as "{id:<><sep>topicId:<val><sep>topicName:<val><sep>partition:<val>}" in the event value.
topic-id-partition is represented as "{topicId:<val><sep>topicName:<val><sep>partition:<val>}" in the event value.

For RemoteLogSegmentMetadata 
default representation is "{remote-log-segment-id:<val><sep>start-offset:<val><sep>end-offset:<val><sep>leader-epoch:<val><sep>remote-log-segment-state:<COPY_SEGMENT_STARTED | COPY_SEGMENT_FINISHED | DELETE_SEGMENT_STARTED | DELETE_SEGMENT_FINISHED>}"

For RemoteLogSegmentMetadataUpdate
default representation is "{remote-log-segment-id:<val><sep>leader-epoch:<val><sep>remote-log-segment-state:<COPY_SEGMENT_STARTED | COPY_SEGMENT_FINISHED | DELETE_SEGMENT_STARTED | DELETE_SEGMENT_FINISHED>}"

For DeletePartitionState
default representation is "{topic-id-partition:<val><sep>epoch:<val><sep>remote-partition-delete-state:<DELETE_PARTITION_MARKED | DELETE_PARTITION_STARTED | DELETE_PARTITION_FINISHED>


Anchor
topic-deletion
topic-deletion
Topic deletion lifecycle

The controller receives a delete request for a topic. It goes through the existing protocol of deletion and it makes all the replicas offline stop taking any fetch requests.  After all the replicas reach the offline state, the controller publishes an event to the RemoteLogMetadataManager(RLMM) by marking the topic as deleted using RemoteLogMetadataManager.updateRemotePartitionDeleteMetadata with the state as RemotePartitionDeleteState#DELETE_PARTITION_MARKED.  With KIP-516, topics are represented with uuid, and topics can be deleted asynchronously. This allows the remote logs can be garbage collected later by publishing the deletion marker into the remote log metadata topic. RLMM is responsible for asynchronously deleting all the remote log segments of a partition after receiving RemotePartitionDeleteState as DELETE_PARTITION_MARKED. 


Image Added


Default RLMM handles the remote partition deletion by using RemotePartitionRemover(RPRM). 

RPRM instance is created on a broker with the leaders of the remote log segment metadata topic partitions. This task is responsible for removing remote storage of the topics marked for deletion. It consumes messages from those partitions remote log metadata partitions and filters the delete partition events which need to be processed. It collects those partitions and executes deletion of the respective segments using RemoteStorageManager. This is done at regular intervals of remote.partition.remover.task.interval.ms (default value of 1hr). It commits the processed offsets of metadata partitions once the deletions are executed successfully. This will also be helpful to handle leader failovers to a different replica so that it can start processing the messages where it left off. 

RemotePartitionRemover(RPRM) processes the request with the following flow as mentioned in the below diagram. 

  1. The controller publishes DELETE_PARTITION_MARKED event to say that the partition is marked for deletion. There can be multiple events published when the controller restarts or failover and this event will be deduplicated by RPRM. 
  2. RPRM receives the DELETE_PARTITION_MARKED and processes it if it is not yet processed earlier.
  3. RPRM publishes an event DELETE_PARTITION_STARTED that indicates the partition deletion has already been started. 
  4. RPRM gets all the remote log segments for the partition using RLMM and each of these remote log segments is deleted with the next steps.RLMM subscribes to the local remote log metadata partitions and it will have the segment metadata of all the user topic partitions associated with that remote log metadata partition.
  5. Publish DELETE_SEGMENT_STARTED event with the segment id. 
  6. RPRM deletes the segment using RSM 
  7. Publish DELETE_SEGMENT_FINISHED event with segment id once it is successful. 
  8. Publish DELETE_PARTITION_FINISHED once all the segments have been deleted successfully.

Image Added

Protocol Changes

ListOffsets

Currently, it supports the listing of offsets based on the earliest timestamp and the latest timestamp of the complete log. There is no change in the protocol but the new versions will start supporting listing earliest offsets based on the local logs but not only on the complete log including remote log. This protocol will be updated with the changes from KIP-516 but there are no changes required as mentioned earlier. Request and response versions will be bumped to version 7.

Fetch

We are bumping up fetch protocol to handle new error codes, there are no changes in request and response schemas. When a follower tries to fetch records for an offset that does not exist locally then it returns a new error `OFFSET_MOVED_TO_TIERED_STORAGE`. This is explained in detail here

OFFSET_MOVED_TO_TIERED_STORAGE - when the requested offset is not available in local storage but it is moved to tiered storage.

Public Interfaces

Compacted topics will not have remote storage support. 

Configs

System-Wide

remote.log.storage.system.enable - Whether to enable tier storage functionality in a broker or not. Valid values are `true` or `false` and the default value is false. This property gives backward compatibility. When it is true broker starts all the services required for tiered storage. 

remote.log.storage.manager.class.name - This is mandatory if the remote.log.storage.system.enable is set as true.

remote.log.metadata.manager.class.name(optional) - This is an optional property. If this is not configured, Kafka uses an inbuilt metadata manager backed by an internal topic.

RemoteStorageManager

(These configs are dependent on remote storage manager implementation)

remote.log.storage.*

RemoteLogMetadataManager

(These configs are dependent on remote log metadata manager implementation)

remote.log.metadata.*

Remote log manager related configuration.

remote.log.index.file.cache.total.size.mb
The total size of the space allocated to store index files fetched from remote storage in the local storage.
Default value: 1024

remote.log.manager.thread.pool.size
Remote log thread pool size, which is used in scheduling tasks to copy segments, and clean up remote log segments.
Default value: 4

remote.log.manager.task.interval.ms
The interval at which the remote log manager runs the scheduled tasks like copy segments, and clean up remote log segments.
Default value: 30,000

Remote log manager tasks are retried with the exponential backoff algorithm mentioned here.

remote.log.manager.task.retry.backoff.ms
The amount of time in milliseconds to wait before attempting the initial retry of a failed remote storage request.
Default value: 500

remote.log.manager.task.retry.backoff.max.ms
The maximum amount of time in milliseconds to wait before attempting to retry a failed remote storage request.
Default value: 30,000

remote.log.manager.task.retry.jitter
Random jitter amount applied to the `remote.log.manager.task.retry.backoff.ms` for computing the resultant backoff interval. This will avoid reconnection storms.
Default value: 0.2

remote.log.reader.threads
Remote log reader thread pool size, which is used in scheduling tasks to fetch data from remote storage.  
Default value: 5

remote.log.reader.max.pending.tasks
Maximum remote log reader thread pool task queue size. If the task queue is full, broker will stop reading remote log segments.
Default value: 100

Per Topic Configuration

Users can set the desired config for remote.storage.enable property for a topic, the default value is false. To enable tier storage for a topic, set remote.storage.enable as true. You can not disable this config once it is enabled. We will provide this feature in future versions.

Below retention configs are similar to the log retention. This configuration is used to determine how long the log segments are to be retained in the local storage. Existing retention.* are retention configs for the topic partition which includes both local and remote storage. 

local.retention.ms
The number of milli seconds to keep the local log segment before it gets deleted. If not set, the value in `log.retention.ms` is used. The effective value should always be less than or equal to retention.ms value.

local.retention.bytes
The maximum size of local log segments that can grow for a partition before it deletes the old segments. If not set, the value in `log.retention.bytes` is used. The effective value should always be less than or equal to retention.bytes value.

Remote Storage Manager

`RemoteStorageManager` is an interface to provide the lifecycle of remote log segments and indexes. More details about how we arrived at this interface are discussed in the document. We will provide a simple implementation of RSM to get a better understanding of the APIs. HDFS and S3 implementation are planned to be hosted in external repos and these will not be part of Apache Kafka repo. This is in line with the approach taken for Kafka connectors.

Copying and Deleting APIs are expected to be idempotent, so plugin implementations can retry safely and overwrite any partially copied content, or not failing when content is already deleted.


Code Block
languagejava
titleRemoteStorageManager
package org.apache.kafka.server.log.remote.storage;
...
/**
 * RemoteStorageManager provides the lifecycle of remote log segments that includes copy, fetch, and delete from remote
 * storage.
 * <p>
 * Each upload or copy of a segment is initiated with {@link RemoteLogSegmentMetadata} containing {@link RemoteLogSegmentId}
 * which is universally unique even for the same topic partition and offsets.
 * <p>
 * RemoteLogSegmentMetadata is stored in {@link RemoteLogMetadataManager} before and after copy/delete operations on
 * RemoteStorageManager with the respective {@link RemoteLogSegmentState}. {@link RemoteLogMetadataManager} is
 * responsible for storing and fetching metadata about the remote log segments in a strongly consistent manner.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 * <p>
 * All these APIs are still evolving.
 */
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {


    /**
     * Type of the index file.
     */
    enum IndexType {
        /**
         * Represents offset index.
         */
        Offset,

        /**
         * Represents timestamp index.
         */
        Timestamp,

    

Remote Storage Manager

`RemoteStorageManager` is an interface to provide the lifecycle of remote log segments and indexes. More details about how we arrived at this interface are discussed in the document. We will provide a simple implementation of RSM to get a better understanding of the APIs. HDFS and S3 implementation are planned to be hosted in external repos and these will not be part of Apache Kafka repo. This is inline with the approach taken for Kafka connectors.

Code Block
languagejava
titleRemoteStorageManager


/**
 * RemoteStorageManager provides the lifecycle of remote log segments that includes copy, fetch, and delete from remote
 * storage.
 * <p>
 * Each upload or copy of a segment is initiated with {@link RemoteLogSegmentMetadata} containing {@link RemoteLogSegmentId}
 * which is universally unique even for the same topic partition and offsets.
 * <p>
 * RemoteLogSegmentMetadata is stored in {@link RemoteLogMetadataManager} before and after copy/delete operations on
 * RemoteStorageManager with the respective {@link RemoteLogSegmentMetadata.State}. {@link RemoteLogMetadataManager} is
 * responsible for storing and fetching metadata about the remote log segments in a strongly consistent manner.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 * <p>
 * All these APIs are still evolving.
 */
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {

    /**
     * Copies LogSegmentData provided for the given {@param remoteLogSegmentMetadata}.
     * <p>
     * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentMetadata#remoteLogSegmentId()#id()}
     * even when it retries to invoke this method for the same log segment data.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @param logSegmentData           data to be copied to tiered storage.
     * @throws RemoteStorageException if there are any errors in storing the data of the segment.
     */
    void copyLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, LogSegmentData logSegmentData)
            throws RemoteStorageException;

    /**
     * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting
     * from the given startPosition. The stream will end at the smaller of endPosition and the end of the remote log
     * segment data file/object.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @param startPosition            start position of log segment to be read, inclusive.
     * @param endPosition              end position of log segment to be read, inclusive.
     * @return input stream of the requested log segment data.
     * @throws RemoteStorageException if there are any errors while fetching the desired segment.
     */
    InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                    Long startPosition, Long endPosition) throws RemoteStorageException;

    /**
     * Returns the offset index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @return input stream of the requested  offset index.
     * @throws RemoteStorageException if there are any errors while fetching the index.
     */
    InputStream fetchOffsetIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * Returns the timestamp index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @return input stream of the requested  timestamp index.
     * @throws RemoteStorageException if there are any errors while fetching the index.
     */
    InputStream fetchTimestampIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) listRemoteLogSegmentsthrows RemoteStorageException;

    /**
     * Returns the transaction index for the the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @return input stream of the requested  transaction index.
     * @throws RemoteStorageException if there are any errors while fetching the index.
     */
    default InputStream fetchTransactionIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * Returns the producer snapshot index for the the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @return input stream of the producer snapshot.
     * @throws RemoteStorageException if there are any errors while fetching the index.
     */
    InputStream fetchProducerSnapshotIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * Returns the leader epoch index for the the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @return input stream of the leader epoch index.
     * @throws RemoteStorageException if there are any errors while fetching the index.
     */
    InputStream fetchLeaderEpochIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * Deletes the resources associated with the given {@param remoteLogSegmentMetadata}. Deletion is considered as
     * successful if this call returns successfully without any errors. It will throw {@link RemoteStorageException} if
     * there are any errors in deleting the file.
     * <p>
     * {@link RemoteResourceNotFoundException} is thrown when there are no resources associated with the given
     * {@param remoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment to be deleted.
     * @throws RemoteResourceNotFoundException if the requested resource is not found
     * @throws RemoteStorageException          if there are any storage related errors occurred.
     */
    void deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

}

/**
 * This represents a universally unique identifier associated to a topic partition's log segment. This will be
 * regenerated for every attempt of copying a specific log segment in {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentMetadata, LogSegmentData)}.
 */
public class RemoteLogSegmentId implements Comparable<RemoteLogSegmentId>, Serializable {
    private static final long serialVersionUID = 1L;

    private final TopicIdPartition topicIdPartition;
    private final UUID id;

    public RemoteLogSegmentId(TopicIdPartition topicIdPartition, UUID id) {
        this.topicIdPartition = requireNonNull(topicIdPartition);
        this.id = requireNonNull(id);
    }

    /**
     * Returns TopicIdPartition of this remote log segment.
     * 
     * @return
     */
    public TopicIdPartition topicIdPartition() {
        return topicIdPartition;
    }

    /**
     * Returns Universally Unique Id of this remote log segment.
     *
     * @return
     */
    public UUID id() {
        return id;
    }
...
}


/**
 * It describes the metadata about the log segment in the remote storage.
 */
public class RemoteLogSegmentMetadata implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Universally unique remote log segment id.
     */
    private final RemoteLogSegmentId remoteLogSegmentId;

    /**
     * Start offset of this segment.
     */
    private final long startOffset;

    /**
     * End offset of this segment.
     */
    private final long endOffset;

    /**
     * Leader epoch of the broker.
     */
    private final int leaderEpoch;

    /**
     * Maximum timestamp in the segment
     */
    private final long maxTimestamp;

    /**
     * Epoch time at which the respective {@link #state} is set.
     */
    private final long eventTimestamp;

    /**
     * LeaderEpoch vs offset for messages with in this segment.
     */
    private final Map<Int, Long> segmentLeaderEpochs;

    /**
     * Size of the segment in bytes.
     */
    private final int segmentSizeInBytes;

    /**
     * It indicates the state in which the action is executed on this segment.
     */
    private final RemoteLogState state;

    /**
     * @param remoteLogSegmentId  Universally unique remote log segment id.
     * @param startOffset         Start offset of this segment.
     * @param endOffset           End offset of this segment.
     * @param maxTimestamp        maximum timestamp in this segment
     * @param leaderEpoch         Leader epoch of the broker.
     * @param eventTimestamp    Epoch time at which the remote log segment is copied to the remote tier storage.
     * @param segmentSizeInBytes  size of this segment in bytes.
     * @param state   The respective segment of remoteLogSegmentId is marked fro deletion.
     * @param segmentLeaderEpochs leader epochs occurred with in this segment
     */
    public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset,
                                    long maxTimestamp, int leaderEpoch, long eventTimestamp,
                                    int segmentSizeInBytes, RemoteLogState state, Map<Int, Long> segmentLeaderEpochs) {
        this.remoteLogSegmentId = remoteLogSegmentId;
        this.startOffset = startOffset;
        this.endOffset = endOffset;
        this.leaderEpoch = leaderEpoch;
        this.maxTimestamp = maxTimestamp;
        this.eventTimestamp = eventTimestamp;
        this.segmentLeaderEpochs = segmentLeaderEpochs;
        this.state = state;
        this.segmentSizeInBytes = segmentSizeInBytes;
    }

    /**
     * @return unique id of this segment.
     */
    public RemoteLogSegmentId remoteLogSegmentId() {
        return remoteLogSegmentId;
    }

    /**
     *   @return Start* offsetRepresents ofproducer this segment(inclusive)snapshot index.
     */
    public long startOffset() {*/
        return startOffset;ProducerSnapshot,

    }

    /**
     * @return End offset of* this segment(inclusive)Represents transaction index.
     */
    public long endOffset() { */
        return endOffset;Transaction,

    }

    /**
     * @return Leader or controller* epochRepresents ofleader the broker from where this event occurred.
epoch index.
         */
    public  int brokerEpoch() {LeaderEpoch,
    }     return brokerEpoch;
    }

    /**
     * Copies the @returngiven Epoch{@link timeLogSegmentData} atprovided whichfor thisthe evcentgiven is{@code occurred.remoteLogSegmentMetadata}. This includes
     */
 log segment and its publicauxiliary longindexes eventTimestamp() {
        return eventTimestamp;like offset index, time index, transaction index, leader epoch index, and
    }

 * producer snapshot /**index.
     * @return<p>
     */
 Invoker of this publicAPI intshould segmentSizeInBytes() {
        return segmentSizeInBytes;
always send a unique id as part of {@link RemoteLogSegmentMetadata#remoteLogSegmentId()}
     }

* even when it publicretries RemoteLogStateto state() {
        return state;
    }

invoke this method for the same log segment data.
     * <p>
    public long* maxTimestamp() {
        return maxTimestamp;
    }

    public Map<Int, Long> segmentLeaderEpochs() {
        return segmentLeaderEpochs;
    }

...
}

public class LogSegmentData {

    private final File logSegment;
    private final File offsetIndex;
    private final File timeIndex;
    private final File txnIndex;
    private final File producerIdSnapshotIndex;
    private final ByteBuffer leaderEpochIndex;

    public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex,
      This operation is expected to be idempotent. If a copy operation is retried and there is existing content already written,
     * it should be overwritten, and do not throw {@link RemoteStorageException}
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @param logSegmentData           data to be copied to tiered storage.
     * @throws RemoteStorageException if there are any errors in storing the data of the segment.
     */
    void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                    ByteBuffer leaderEpochIndex) {
      LogSegmentData logSegmentData)
 this.logSegment = logSegment;
        this.offsetIndex =throws offsetIndexRemoteStorageException;

    /**
     * Returns this.timeIndexthe =remote timeIndex;
log segment data file/object as InputStream for the this.txnIndex = txnIndex;given {@link RemoteLogSegmentMetadata}
     * starting from this.producerIdSnapshotIndexthe = producerIdSnapshotIndex;
        this.leaderEpochIndex = leaderEpochIndex;
    }

    public File logSegment() {given startPosition. The stream will end at the end of the remote log segment data file/object.
     *
     * @param remoteLogSegmentMetadata returnmetadata logSegment;
about the remote log }segment.

     public* File@param offsetIndex()startPosition {
        return offsetIndex;
  start  }

    public File timeIndex() {
position of log segment to be read, inclusive.
     * @return input stream of the requested log segment data.
     * @throws RemoteStorageException          if there are return timeIndex;
    }

any errors while fetching the desired segment.
     public* File@throws txnIndex() {
        return txnIndex;
RemoteResourceNotFoundException the requested log segment is not found in the remote storage.
     }*/

    publicInputStream File producerIdSnapshotIndexfetchLogSegment()RemoteLogSegmentMetadata {
remoteLogSegmentMetadata,
             return producerIdSnapshotIndex;
    }

    public ByteBuffer leaderEpochIndex() {
       int return leaderEpochIndex;startPosition) throws RemoteStorageException;

    }
...
}

RemoteLogMetadataManager

`RemoteLogMetadataManager` is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. There is a default implementation that uses an internal topic. Users can plugin their own implementation if they intend to use another system to store remote log segment metadata.

Code Block
languagejava
titleRemoteLogMetadataManager

/**
 * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
 * <p>
 * This class can be plugged in to Kafka cluster by adding the implementation class as
 * <code>remote.log.metadata.manager.class.name</code> property value. There is an inbuilt implementation backed by
 * topic storage in the local cluster. This is used as the default implementation if
 * remote.log.metadata.manager.class.name is not configured.
 * </p>
 * <p>
 * <code>remote.log.metadata.manager.class.path</code> property is about the class path of the RemoteLogStorageManager
 * implementation. If specified, the RemoteLogStorageManager implementation and its dependent libraries will be loaded
 * by a dedicated classloader which searches this class path before the Kafka broker class path. The syntax of this
 * parameter is same with the standard Java class path string.
 * </p>
 * <p>
 * <code>remote.log.metadata.manager.listener.name</code> property is about listener name of the local broker to which
 * it should get connected if needed by RemoteLogMetadataManager implementation. When this is configured all other
 * required properties can be passed as properties with prefix of 'remote.log.metadata.manager.listener.
 * </p>
 * "cluster.id", "broker.id" and all the properties prefixed with "remote.log.metadata." are passed when
 * {@link #configure(Map)} is invoked on this instance.
 * <p>
 * <p>
 * <p>
 * All these APIs are still evolving.
 * <p>
 */
@InterfaceStability.Unstable
public interface RemoteLogMetadataManager extends Configurable, Closeable {

    /**
     * Stores RemoteLogSegmentMetadata with the containing RemoteLogSegmentId into RemoteLogMetadataManager.
     * <p>
     * RemoteLogSegmentMetadata is identified by RemoteLogSegmentId.
     */**
     * Returns the remote log segment data file/object as InputStream for the given {@link RemoteLogSegmentMetadata}
     * starting from the given startPosition. The stream will end at the smaller of endPosition and the end of the
     * remote log segment data file/object.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @param startPosition            start position of log segment to be read, inclusive.
     * @param endPosition              end position of log segment to be read, inclusive.
     * @return input stream of the requested log segment data.
     * @throws RemoteStorageException          if there are any errors while fetching the desired segment.
     * @throws RemoteResourceNotFoundException the requested log segment is not found in the remote storage.
     */
    InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                int startPosition,
                                int endPosition) throws RemoteStorageException;

    /**
     * @paramReturns remoteLogSegmentMetadatathe metadataindex aboutfor the remoterespective log segment toof be{@link deletedRemoteLogSegmentMetadata}.
     * @throws<p>
 RemoteStorageException if there are any* storageIf relatedthe errorsindex occurred.
is not present (e.g. Transaction */
index may not exist voidbecause putRemoteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

segments create prior to
     /**
     * Explain detail what all states this can be applied with.* version 2.8.0 will not have transaction index associated with them.),
     * throws {@link RemoteResourceNotFoundException}
     *
     * @param remoteLogSegmentMetadataUpdate
remoteLogSegmentMetadata metadata about the remote * @throws RemoteStorageExceptionlog segment.
     */
 @param indexType  void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) throws RemoteStorageException;

    /**
     * Fetchestype RemoteLogSegmentMetadata if it exists for of the givenindex topicto partition containing offset and leader-epochbe fetched for the offset, segment.
     * else@return returns {@link Optional#empty()}input stream of the requested index.
     *
 @throws RemoteStorageException   * @param topicIdPartition topic partition
   if there *are @paramany offseterrors while fetching the index.
     offset
* @throws RemoteResourceNotFoundException the requested *index @paramis epochForOffsetnot leaderfound epoch forin the givenremote offsetstorage.
     * @return The caller of this function are encouraged to re-create the requestedindexes remotefrom logthe segment metadata.
     * @throwsas RemoteStorageExceptionthe ifsuggested thereway areof anyhandling storage related errors occurredthis error.
     */
    Optional<RemoteLogSegmentMetadata>InputStream remoteLogSegmentMetadatafetchIndex(TopicIdPartitionRemoteLogSegmentMetadata topicIdPartition, long offset, int epochForOffset)
remoteLogSegmentMetadata,
                           IndexType indexType) throws RemoteStorageException;

    /**
     * ReturnsDeletes earliestthe logresources offset if there are segments in the remote storage for the given topic partition andassociated with the given {@code remoteLogSegmentMetadata}. Deletion is considered as
     * leadersuccessful epochif elsethis call returns successfully without any errors. It will throw {@link Optional#empty()}.RemoteStorageException} if
     *
 there are any errors *in @paramdeleting topicIdPartition topic partitionthe file.
     * <p>
   @param leaderEpoch * This operation leaderis epoch
expected to be idempotent. If resources *are @returnnot thefound, earliestit logis offsetnot ifexpected exists.to
     */
 throw {@link RemoteResourceNotFoundException} Optional<Long>as earliestLogOffset(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException;

    /**it may be already removed from a previous attempt.
     *
  Returns highest log offset* of@param topicremoteLogSegmentMetadata partitionmetadata forabout the givenremote leaderlog epochsegment into remotebe storage. This is used bydeleted.
     * remote@throws logRemoteStorageException management subsystem to know upto which offset the segments haveif beenthere copiedare toany remotestorage storagerelated errors foroccurred.
     * a given leader epoch.
     *
     * @param topicIdPartition topic partition
     * @param leaderEpoch    leader epoch/
    void deleteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
  }


package org.apache.kafka.common;
...
public class TopicIdPartition {

    private final UUID topicId;
    private final TopicPartition topicPartition;

    public TopicIdPartition(UUID topicId, TopicPartition topicPartition) {
     * @return the requested highest log offset if exists. Objects.requireNonNull(topicId, "topicId can not be null");
     * @throws RemoteStorageException if there are any storage related errors occurred.
   Objects.requireNonNull(topicPartition, "topicPartition can not be null");
         */this.topicId = topicId;
    Optional<Long> highestLogOffset(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageExceptionthis.topicPartition = topicPartition;

    /**}

    public *UUID Update the delete partition state of a topic partition in metadata storage. Controller invokes this method with
     * DeletePartitionUpdate having state as {@link RemoteLogState#DELETE_PARTITION_MARKED}. So, remote log cleaners
     * can act on this event to clean the respective remote log segments of the partition.
     *
     * Incase of default RLMM implementation, remote log cleaner processes RemoteLogState#DELETE_PARTITION_MARKED
     *  - sends an event with state as RemoteLogState#DELETE_PARTITION_STARTED
     *  - getting all the remote log segments and deletes them.
     *  - sends an event with state as RemoteLogState#DELETE_PARTITION_FINISHED once all the remote log segments are
     *  deleted.
     topicId() {
        return topicId;
    }

    public TopicPartition topicPartition() {
        return topicPartition;
    }

...
}


package org.apache.kafka.server.log.remote.storage;
...
/**
 * This represents a universally unique identifier associated to a topic partition's log segment. This will be
 * regenerated for every attempt of copying a specific log segment in {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentMetadata, LogSegmentData)}.
 */
public class RemoteLogSegmentId implements Comparable<RemoteLogSegmentId>, Serializable {
    private static final long serialVersionUID = 1L;

    private final TopicIdPartition topicIdPartition;
    private final UUID id;

    public RemoteLogSegmentId(TopicIdPartition topicIdPartition, UUID id) {
        this.topicIdPartition = requireNonNull(topicIdPartition);
        this.id = requireNonNull(id);
    }

    /**
     * @paramReturns deletePartitionUpdateTopicIdPartition updateof onthis deleteremote state of a partitionlog segment.
     * 
 @throws  RemoteStorageException if there* are@return
 any storage related errors occurred.*/
     */
public TopicIdPartition topicIdPartition() {
     void updateDeletePartitionState(DeletePartitionUpdate deletePartitionUpdate) throwsreturn RemoteStorageExceptiontopicIdPartition;
    }

    /**
     * ListReturns theUniversally remoteUnique logId segmentof metadatathis ofremote thelog given topicIdPartitionsegment.
     * <p>
     * @return
 This is used when a*/
 topic partition is deletedpublic orUUID cleaning up segments based on the retention, to fetch all theid() {
        return id;
    }
...
}

package org.apache.kafka.server.log.remote.storage;
...
/**
 * It remotedescribes logthe segmentsmetadata forabout the givenlog topicsegment partitionin andthe deleteremote themstorage.
     */
public class RemoteLogSegmentMetadata implements Serializable *{

 @return Iterator of remoteprivate segments,static sortedfinal bylong baseOffsetserialVersionUID in ascending order.= 1L;

     /**/
    default * Iterator<RemoteLogSegmentMetadata>Universally listRemoteLogSegments(TopicIdPartition topicIdPartition) {
  unique remote log segment id.
     */
 return listRemoteLogSegments(topicIdPartition, 0);
 private final RemoteLogSegmentId }remoteLogSegmentId;

    /**
     * ReturnsStart iteratoroffset of remotethis log segment.
 metadata, sorted by {@link RemoteLogSegmentMetadata#startOffset()} in */
    private *final ascending order which contains the given leader epoch. This is used by remote log retention management subsystemlong startOffset;

    /**
     * End offset of this segment.
     */
 to fetch the segmentprivate metadatafinal for a given leader epoch.long endOffset;

     /**
     * Leader @paramepoch topicIdPartitionof topicthe partitionbroker.
     */
 @param leaderEpoch  private final leaderint epochleaderEpoch;

     /**
 @return Iterator of remote segments,* sortedMaximum by baseOffsettimestamp in ascendingthe order.segment
     */
    Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition,private final long leaderEpoch)maxTimestamp;

    /**
     * ThisEpoch methodtime isat invokedwhich onlythe whenrespective there{@link are#state} changesis inset.
 leadership of the topic partitions*/
 that this broker is
private final long eventTimestamp;

  * responsible for./**
     *
     * @param leaderPartitions   partitions that have become leaders on this broker. LeaderEpoch vs offset for messages with in this segment.
     */
    private final Map<Int, Long> segmentLeaderEpochs;

    /**
 * @param followerPartitions partitions that* haveSize becomeof followersthe onsegment thisin brokerbytes.
     */
    voidprivate onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions, Set<TopicIdPartition> followerPartitions)final int segmentSizeInBytes;

    /**
     * ThisIt methodindicates isthe invokedstate onlyin whenwhich the givenaction topic partitions are stoppedis executed on this brokersegment.
 This can happen when a
 */
    private final RemoteLogSegmentState state;

    /**
 partition    is* emigrated@param toremoteLogSegmentId other brokerUniversally orunique aremote partitionlog issegment deletedid.
     *
 @param startOffset     * @param partitions topic partitionsStart whichoffset haveof beenthis stoppedsegment.
     */
 @param endOffset     void onStopPartitions(Set<TopicIdPartition> partitions);
}


/**
 * It describes theEnd metadataoffset aboutof thethis log segment in the remote storage.
 */
public class RemoteLogSegmentMetadataUpdate implements Serializable* {

@param maxTimestamp   private static final long serialVersionUID = 1L;

Maximum timestamp in this /**segment
     * Universally@param uniqueleaderEpoch remote log segment id.
     */
Leader epoch of the privatebroker.
 final RemoteLogSegmentId remoteLogSegmentId;

  * @param /**
eventTimestamp     * Epoch time at which the respectiveremote {@linklog #state}segment is set copied to the remote tier storage.
     */
 @param segmentSizeInBytes  privateSize finalof long eventTimestamp;

    /**this segment in bytes.
     * It indicates the@param state in which the action is executed on this segment.
     */
 State of the privaterespective finalsegment RemoteLogStateof state;remoteLogSegmentId.

     /**
 @param segmentLeaderEpochs leader epochs *occurred @paramwith remoteLogSegmentIdin this Universallysegment
 unique remote log segment id.*/
    public * @param eventTimestamp   RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset,
   Epoch time at which the remote log segment is copied to the remote tier storage.
     * @param state            long maxTimestamp, int TheleaderEpoch, respectivelong segmenteventTimestamp,
 of remoteLogSegmentId is marked fro deletion.
     */
    public RemoteLogSegmentMetadataUpdate(RemoteLogSegmentId remoteLogSegmentId,
                   int segmentSizeInBytes, RemoteLogSegmentState state, Map<Int, Long> segmentLeaderEpochs) {
        this.remoteLogSegmentId = remoteLogSegmentId;
      long eventTimestamp,
 this.startOffset = startOffset;
        this.endOffset = endOffset;
        this.leaderEpoch = leaderEpoch;
        this.maxTimestamp = maxTimestamp;
        this.eventTimestamp RemoteLogState state) {= eventTimestamp;
        this.remoteLogSegmentIdsegmentLeaderEpochs = remoteLogSegmentIdsegmentLeaderEpochs;
        this.eventTimestampstate = eventTimestampstate;
        this.statesegmentSizeInBytes = statesegmentSizeInBytes;
    }

    public/**
 RemoteLogSegmentId remoteLogSegmentId() {
  * @return unique id of  return remoteLogSegmentId;this segment.
    } */

    public longRemoteLogSegmentId createdTimestampremoteLogSegmentId() {
        return eventTimestampremoteLogSegmentId;
    }

    public/**
 RemoteLogState state() {
  * @return Start offset of  return state;
this segment(inclusive).
     }
...
}


/**
 *
 */
public class DeletePartitionUpdate {

*/
    public long startOffset() {
      private final TopicIdPartitionreturn topicIdPartitionstartOffset;
    }

 private final RemotePartitionState state;/**
    private final long eventTimestamp;
    private final int epoch;

 * @return End offset of this segment(inclusive).
     */
    public DeletePartitionUpdate(TopicIdPartition topicIdPartition, RemotePartitionState state, long eventTimestamp, int epochendOffset() {
        Objects.requireNonNull(topicIdPartition)return endOffset;
    }

    Objects.requireNonNull(state);/**
     * @return Leader this.topicIdPartitionor =controller topicIdPartition;
epoch of the broker from where this event this.state = state;occurred.
     */
    public this.eventTimestamp = eventTimestamp;int brokerEpoch() {
        this.epoch = epochreturn brokerEpoch;
    }

    /**
  public TopicIdPartition topicIdPartition() {
* @return Epoch time at which this evcent returnis topicIdPartition;occurred.
     }*/

    public RemotePartitionStatelong stateeventTimestamp() {
        return stateeventTimestamp;
    }

    /**
     * @return
     */
    public longint eventTimestampsegmentSizeInBytes() {
        return eventTimestampsegmentSizeInBytes;
    }

    public intRemoteLogSegmentState epochstate() {
        return epochstate;
    }

...
}



/**
 * It indicates thepublic statelong of the remote topic partition. This will be based on the action executed on this
 * partition by the remote log service implementation.
 * <p>
 * todo: check whether the state validations to be checked or not, add next possible states for each state.
 */
public enum RemotePartitionStatemaxTimestamp() {
        return maxTimestamp;
    }

    public Map<Int, Long> segmentLeaderEpochs() {
        return segmentLeaderEpochs;
    }

...
}

package org.apache.kafka.server.log.remote.storage;
...
public class LogSegmentData {

    /**
private final File logSegment;
  * This isprivate usedfinal whenFile a topic/partition is deleted by controller.
  offsetIndex;
    private final File timeIndex;
   * Thisprivate partitionfinal isFile markedtxnIndex;
 for delete by controller.private Thatfinal means, all its remote log segments are eligible forFile producerIdSnapshotIndex;
    private final ByteBuffer leaderEpochIndex;

    public * deletion so that remote log cleaners can start deleting them.LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex,
     */
    DELETE_PARTITION_MARKED((byte) 0),

    /**
     * This state indicates that the partition deletionByteBuffer is started but not yet finished.leaderEpochIndex) {
     */
    DELETE_PARTITION_STARTED((byte) 1),

this.logSegment = logSegment;
    /**
    this.offsetIndex *= ThisoffsetIndex;
 state indicates that the partition is deleted successfully.this.timeIndex = timeIndex;
     */
    DELETE_PARTITION_FINISHED((byte) 2);

this.txnIndex = txnIndex;
    private static final Map<Byte, RemotePartitionState> STATE_TYPES this.producerIdSnapshotIndex = Collections.unmodifiableMap(producerIdSnapshotIndex;
        this.leaderEpochIndex    Arrays.stream(values()).collect(Collectors.toMap(RemotePartitionState::id, Function.identity())));
= leaderEpochIndex;
    private}

 final byte id;

 public File  RemotePartitionStatelogSegment(byte id) {
        this.id = idreturn logSegment;
    }

    public byteFile idoffsetIndex() {
        return idoffsetIndex;
    }

    public staticFile RemotePartitionState forId(byte id) timeIndex() {
        return STATE_TYPES.get(id)timeIndex;
    }
...
}


/**
 * It indicates thepublic stateFile of the remote log segment. This will be based on the action executed on this
 * segment by the remote log service implementation.
 * <p>
 * todo: check whether the state validations to be checked or not, add next possible states for each state.
 */
public enum RemoteLogState {

    /**
     * This state indicates that the segment copying to remote storage is started but not yet finished.
     */
    COPY_SEGMENT_STARTED((byte) 0),

    /**
     * This state indicates that the segment copying to remote storage is finished.
     */
    COPY_SEGMENT_FINISHED((byte) 1),

    /**
     * This state indicates that the segment deletion is started but not yet finished.
     */
    DELETE_SEGMENT_STARTED((byte) 2),

    /**
     * This state indicates that the segment is deleted successfully.
     */
    DELETE_SEGMENT_FINISHED((byte) 3),

    private static final Map<Byte, RemoteLogState> STATE_TYPES = Collections.unmodifiableMap(
            Arrays.stream(values()).collect(Collectors.toMap(RemoteLogState::id, Function.identity())));

    private final byte id;

    RemoteLogState(byte id) {
        this.id = id;
    }

    public byte id() {
        return id;
    }

    public static RemoteLogState forId(byte id) {
        return STATE_TYPES.get(id);
    }
...
}


New Metrics

The following new metrics will be added:

...

kafka.server:type=BrokerTopicMetrics, name=RemoteBytesOutPerSec, topic=([-.w]+)

...

txnIndex() {
        return txnIndex;
    }

    public File producerIdSnapshotIndex() {
        return producerIdSnapshotIndex;
    }

    public ByteBuffer leaderEpochIndex() {
        return leaderEpochIndex;
    }
...
}

RemoteLogMetadataManager

`RemoteLogMetadataManager` is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. There is a default implementation that uses an internal topic. Users can plugin their own implementation if they intend to use another system to store remote log segment metadata.


Code Block
languagejava
titleRemoteLogMetadataManager
package org.apache.kafka.server.log.remote.storage;
...
/**
 * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
 * <p>
 * This class can be plugged in to Kafka cluster by adding the implementation class as
 * <code>remote.log.metadata.manager.class.name</code> property value. There is an inbuilt implementation backed by
 * topic storage in the local cluster. This is used as the default implementation if
 * remote.log.metadata.manager.class.name is not configured.
 * </p>
 * <p>
 * <code>remote.log.metadata.manager.class.path</code> property is about the class path of the RemoteLogStorageManager
 * implementation. If specified, the RemoteLogStorageManager implementation and its dependent libraries will be loaded
 * by a dedicated classloader which searches this class path before the Kafka broker class path. The syntax of this
 * parameter is same with the standard Java class path string.
 * </p>
 * <p>
 * <code>remote.log.metadata.manager.listener.name</code> property is about listener name of the local broker to which
 * it should get connected if needed by RemoteLogMetadataManager implementation. When this is configured all other
 * required properties can be passed as properties with prefix of 'remote.log.metadata.manager.listener.
 * </p>
 * "cluster.id", "broker.id" and all other properties prefixed with "remote.log.metadata." are passed when
 * {@link #configure(Map)} is invoked on this instance.
 * <p>
 */
@InterfaceStability.Evolving
public interface RemoteLogMetadataManager extends Configurable, Closeable {

    /**
     * Asynchronously adds {@link RemoteLogSegmentMetadata} with the containing {@link RemoteLogSegmentId} into {@link RemoteLogMetadataManager}.
     * <p>
     * RemoteLogSegmentMetadata is identified by RemoteLogSegmentId and it should have the initial state which is {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}.
     * <p>
     * {@link #updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate)} should be used to update an existing RemoteLogSegmentMetadata.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @throws RemoteStorageException   if there are any storage related errors occurred.
     * @throws IllegalArgumentException if the given metadata instance does not have the state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
     * @return a Future which will complete once this operation is finished.
     */
    Future<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * This method is used to update the {@link RemoteLogSegmentMetadata} asynchronously. Currently, it allows to update with the new
     * state based on the life cycle of the segment. It can go through the below state transitions.
     * <p>
     * <pre>
     * +---------------------+            +----------------------+
     * |COPY_SEGMENT_STARTED |----------->|COPY_SEGMENT_FINISHED |
     * +-------------------+-+            +--+-------------------+
     *                     |                 |
     *                     |                 |
     *                     v                 v
     *                  +--+-----------------+-+
     *                  |DELETE_SEGMENT_STARTED|
     *                  +-----------+----------+
     *                              |
     *                              |
     *                              v
     *                  +-----------+-----------+
     *                  |DELETE_SEGMENT_FINISHED|
     *                  +-----------------------+
     * </pre>
     * <p>
     * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED} - This state indicates that the segment copying to remote storage is started but not yet finished.
     * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED} - This state indicates that the segment copying to remote storage is finished.
     * <br>
     * The leader broker copies the log segments to the remote storage and puts the remote log segment metadata with the
     * state as “COPY_SEGMENT_STARTED” and updates the state as “COPY_SEGMENT_FINISHED” once the copy is successful.
     * <p></p>
     * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED} - This state indicates that the segment deletion is started but not yet finished.
     * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED} - This state indicates that the segment is deleted successfully.
     * <br>
     * Leader partitions publish both the above delete segment events when remote log retention is reached for the
     * respective segments. Remote Partition Removers also publish these events when a segment is deleted as part of
     * the remote partition deletion.
     *
     * @param remoteLogSegmentMetadataUpdate update of the remote log segment metadata.
     * @throws RemoteStorageException          if there are any storage related errors occurred.
     * @throws RemoteResourceNotFoundException when there are no resources associated with the given remoteLogSegmentMetadataUpdate.
     * @throws IllegalArgumentException        if the given metadata instance has the state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
     * @return a Future which will complete once this operation is finished.
     */
    Future<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate)
            throws RemoteStorageException;

    /**
     * Returns {@link RemoteLogSegmentMetadata} if it exists for the given topic partition containing the offset with
     * the given leader-epoch for the offset, else returns {@link Optional#empty()}.
     *
     * @param topicIdPartition topic partition
     * @param epochForOffset   leader epoch for the given offset
     * @param offset           offset
     * @return the requested remote log segment metadata if it exists.
     * @throws RemoteStorageException if there are any storage related errors occurred.
     */
    Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
                                                                int epochForOffset,
                                                                long offset)
            throws RemoteStorageException;

    /**
     * Returns the highest log offset of topic partition for the given leader epoch in remote storage. This is used by
     * remote log management subsystem to know up to which offset the segments have been copied to remote storage for
     * a given leader epoch.
     *
     * @param topicIdPartition topic partition
     * @param leaderEpoch      leader epoch
     * @return the requested highest log offset if exists.
     * @throws RemoteStorageException if there are any storage related errors occurred.
     */
    Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition,
                                         int leaderEpoch) throws RemoteStorageException;

    /**
     * This method is used to update the metadata about remote partition delete event asynchronously. Currently, it allows updating the
     * state ({@link RemotePartitionDeleteState}) of a topic partition in remote metadata storage. Controller invokes
     * this method with {@link RemotePartitionDeleteMetadata} having state as {@link RemotePartitionDeleteState#DELETE_PARTITION_MARKED}.
     * So, remote partition removers can act on this event to clean the respective remote log segments of the partition.
     * <p><br>
     * In the case of default RLMM implementation, remote partition remover processes {@link RemotePartitionDeleteState#DELETE_PARTITION_MARKED}
     * <ul>
     * <li> sends an event with state as {@link RemotePartitionDeleteState#DELETE_PARTITION_STARTED}
     * <li> gets all the remote log segments and deletes them.
     * <li> sends an event with state as {@link RemotePartitionDeleteState#DELETE_PARTITION_FINISHED} once all the remote log segments are
     * deleted.
     * </ul>
     *
     * @param remotePartitionDeleteMetadata update on delete state of a partition.
     * @throws RemoteStorageException          if there are any storage related errors occurred.
     * @throws RemoteResourceNotFoundException when there are no resources associated with the given remotePartitionDeleteMetadata.
     * @return a Future which will complete once this operation is finished.
     */
    Future<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata)
            throws RemoteStorageException;

    /**
     * Returns all the remote log segment metadata of the given topicIdPartition.
     * <p>
     * Remote Partition Removers uses this method to fetch all the segments for a given topic partition, so that they
     * can delete them.
     *
     * @return Iterator of all the remote log segment metadata for the given topic partition.
     */
    Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition)
            throws RemoteStorageException;

    /**
     * Returns iterator of remote log segment metadata, sorted by {@link RemoteLogSegmentMetadata#startOffset()} in
     * ascending order which contains the given leader epoch. This is used by remote log retention management subsystem
     * to fetch the segment metadata for a given leader epoch.
     *
     * @param topicIdPartition topic partition
     * @param leaderEpoch      leader epoch
     * @return Iterator of remote segments, sorted by start offset in ascending order.
     */
    Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition,
                                                             int leaderEpoch) throws RemoteStorageException;

    /**
     * This method is invoked only when there are changes in leadership of the topic partitions that this broker is
     * responsible for.
     *
     * @param leaderPartitions   partitions that have become leaders on this broker.
     * @param followerPartitions partitions that have become followers on this broker.
     */
    void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
                                      Set<TopicIdPartition> followerPartitions);

    /**
     * This method is invoked only when the topic partitions are stopped on this broker. This can happen when a
     * partition is emigrated to other broker or a partition is deleted.
     *
     * @param partitions topic partitions that have been stopped.
     */
    void onStopPartitions(Set<TopicIdPartition> partitions);
}
package org.apache.kafka.server.log.remote.storage;
...
/**
 * It describes the metadata about the log segment in the remote storage.
 */
public class RemoteLogSegmentMetadataUpdate implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Universally unique remote log segment id.
     */
    private final RemoteLogSegmentId remoteLogSegmentId;

    /**
     * Epoch time at which the respective {@link #state} is set.
     */
    private final long eventTimestamp;

    /**
     * Leader epoch of the broker from where this event occurred.
     */
    private final int leaderEpoch;

    /**
     * It indicates the state in which the action is executed on this segment.
     */
    private final RemoteLogSegmentState state;

    /**
     * @param remoteLogSegmentId  Universally unique remote log segment id.
     * @param eventTimestamp      Epoch time at which the remote log segment is copied to the remote tier storage.
     * @param leaderEpoch         Leader epoch of the broker from where this event occurred.
     * @param state               state of the remote log segment.
     */
    public RemoteLogSegmentMetadataUpdate(RemoteLogSegmentId remoteLogSegmentId,
                                          long eventTimestamp,
                                          int leaderEpoch,
                                          RemoteLogSegmentState state) {
        this.remoteLogSegmentId = remoteLogSegmentId;
        this.eventTimestamp = eventTimestamp;
        this.leaderEpoch = leaderEpoch;
        this.state = state;
    }

    public RemoteLogSegmentId remoteLogSegmentId() {
        return remoteLogSegmentId;
    }

    public long createdTimestamp() {
        return eventTimestamp;
    }

    public RemoteLogSegmentState state() {
        return state;
    }
    
    public int leaderEpoch() {
        return leaderEpoch;
    }
...
}

package org.apache.kafka.server.log.remote.storage;
...
public class RemotePartitionDeleteMetadata {

    private final TopicIdPartition topicPartition;
    private final RemotePartitionDeleteState state;
    private final long eventTimestamp;
    private final int epoch;

    public RemotePartitionDeleteMetadata(TopicIdPartition topicPartition, RemotePartitionDeleteState state, long eventTimestamp, int epoch) {
        Objects.requireNonNull(topicPartition);
        Objects.requireNonNull(state);
        if(state != RemotePartitionDeleteState.DELETE_PARTITION_MARKED && state != RemotePartitionDeleteState.DELETE_PARTITION_STARTED
                && state != RemotePartitionDeleteState.DELETE_PARTITION_FINISHED) {
            throw new IllegalArgumentException("state should be one of the delete partition states");
        }
        this.topicPartition = topicPartition;
        this.state = state;
        this.eventTimestamp = eventTimestamp;
        this.epoch = epoch;
    }

    public TopicIdPartition topicPartition() {
        return topicPartition;
    }

    public RemotePartitionDeleteState state() {
        return state;
    }

    public long eventTimestamp() {
        return eventTimestamp;
    }

    public int epoch() {
        return epoch;
    }

...
}


package org.apache.kafka.server.log.remote.storage;
...
/**
 * It indicates the deletion state of the remote topic partition. This will be based on the action executed on this
 * partition by the remote log service implementation.
 * <p>
 */
public enum RemotePartitionDeleteState {

    /**
     * This is used when a topic/partition is deleted by controller.
     * This partition is marked for delete by controller. That means, all its remote log segments are eligible for
     * deletion so that remote partition removers can start deleting them.
     */
    DELETE_PARTITION_MARKED((byte) 0),

    /**
     * This state indicates that the partition deletion is started but not yet finished.
     */
    DELETE_PARTITION_STARTED((byte) 1),

    /**
     * This state indicates that the partition is deleted successfully.
     */
    DELETE_PARTITION_FINISHED((byte) 2);

    private static final Map<Byte, RemotePartitionDeleteState> STATE_TYPES = Collections.unmodifiableMap(
            Arrays.stream(values()).collect(Collectors.toMap(RemotePartitionDeleteState::id, Function.identity())));

    private final byte id;

    RemotePartitionDeleteState(byte id) {
        this.id = id;
    }

    public byte id() {
        return id;
    }

    public static RemotePartitionDeleteState forId(byte id) {
        return STATE_TYPES.get(id);
    }
...
}

package org.apache.kafka.server.log.remote.storage;
...
/**
 * It indicates the state of the remote log segment. This will be based on the action executed on this
 * segment by the remote log service implementation.
 * <p>
 */
public enum RemoteLogSegmentState {

    /**
     * This state indicates that the segment copying to remote storage is started but not yet finished.
     */
    COPY_SEGMENT_STARTED((byte) 0),

    /**
     * This state indicates that the segment copying to remote storage is finished.
     */
    COPY_SEGMENT_FINISHED((byte) 1),

    /**
     * This state indicates that the segment deletion is started but not yet finished.
     */
    DELETE_SEGMENT_STARTED((byte) 2),

    /**
     * This state indicates that the segment is deleted successfully.
     */
    DELETE_SEGMENT_FINISHED((byte) 3),

    private static final Map<Byte, RemoteLogSegmentState> STATE_TYPES = Collections.unmodifiableMap(
            Arrays.stream(values()).collect(Collectors.toMap(RemoteLogSegmentState::id, Function.identity())));

    private final byte id;

    RemoteLogSegmentState(byte id) {
        this.id = id;
    }

    public byte id() {
        return id;
    }

    public static RemoteLogSegmentState forId(byte id) {
        return STATE_TYPES.get(id);
    }
...
}


New Metrics

The following new metrics will be added:

MBeandescription
kafka.server:type=BrokerTopicMetrics, name=RemoteReadRequestsPerSec, topic=([-.w]+)Number of remote storage read requests per second.
kafka.server:type=BrokerTopicMetrics, name=RemoteBytesInPerSec, topic=([-.w]+)Number of bytes read from remote storage per second.
kafka.server:type=BrokerTopicMetrics, name=RemoteReadErrorPerSec, topic=([-.w]+)Number of remote storage read errors per second.
kafka.log.remote:type=RemoteStorageThreadPool, name=RemoteLogReaderTaskQueueSizeNumber of remote storage read tasks pending for execution.
kafka.log.remote:type=RemoteStorageThreadPool, name=RemoteLogReaderAvgIdlePercentAverage idle percent of the remote storage reader thread pool.
kafka.log.remote:type=RemoteLogManager, name=RemoteLogManagerTasksAvgIdlePercentAverage idle percent of RemoteLogManager thread pool.

kafka.server:type=BrokerTopicMetrics, name=RemoteBytesOutPerSec, topic=([-.w]+)

Number of bytes copied to remote storage per second.
kafka.server:type=BrokerTopicMetrics, name=RemoteWriteErrorPerSec, topic=([-.w]+)Number of remote storage write errors per second.

Some of these metrics have been updated with new names as part of KIP-930

Upgrade

Follow the steps mentioned in Kafka upgrade to reach the state where all brokers are running on the latest binaries with the respective "inter.broker.protocol" and "log.message.format" versions. Tiered storage requires the message format to be > 0.11.

To enable tiered storage subsytems, a rolling restart should be done by enabling "remote.log.storage.system.enable" on all brokers.

You can enable tiered storage by setting “remote.storage.enable” to true on the desired topics. Before enabling tiered storage, you should make sure the producer snapshots are built for all the segments for that topic in all followers. You should wait till the log retention occurs for all the segments so that all the segments have producer snapshots. Because follower replicas for topics with tier storage enabled, need the respective producer snapshot for each segment for reconciling the state as mentioned in the earlier follower fetch protocol section.

Downgrade

Downgrade to earlier versions(> 2.1) is possible but the data available only on remote storage will not be available. There will be a few files that are created in remote index cache directory($log.dir/remote-log-index-cache) and other remote log segment metadata cache files that need to be cleaned up by the user. We may provide a script to cleanup the cache files created by tiered storage.Users have to manually delete the data in remote storage based on the bucket or dir configured with tiered storage.

Limitations

  • Once tier storage is enabled for a topic, it can not be disabled. We will add this feature in future versions. One possible workaround is to create a new topic and copy the data from the desired offset and delete the old topic. Another possible work around is to set the log.local.retention.ms same as retention.ms and wait until the local retention catches up until complete log retention. This will make the complete data available locally. After that, set remote.storage.enable as false to disable tiered storage on a topic. 
  • Multiple Log dirs on a broker are not supported (JBOD related features).
  • Tiered storage is not supported for compacted topics.

Integration and System tests

For integration tests, we use file based(LocalTieredStorage) RemoteStorageManager(RSM) . For system tests, we plan to have a single node HDFS cluster in one of the containers and use HDFS RSM implementation.

Feature Test

Feature test cases and test results are documented in this google spreadsheet.

Performance Test Results

We have tested the performance of the initial implementation of this proposal.

The cluster configuration:

  1. 5 brokers
  2. 20 CPU cores, 256GB RAM (each broker)
  3. 2TB * 22 hard disks in RAID0 (each broker)
  4. Hardware RAID card with NV-memory write cache
  5. 20Gbps network
  6. snappy compression
  7. 6300 topic-partitions with 3 replicas
  8. remote storage uses HDFS

Each test case is tested under 2 types of workload (acks=all and acks=1)


Workload-1

(at-least-once, acks=all)

Workload-2

(acks=1)

Producers

10 producers

30MB / sec / broker (leader)

~62K messages / sec / broker (leader)

10 producers

55MB / sec / broker (leader)

~120K messages / sec / broker (leader)

In-sync Consumers

10 consumers

120MB / sec / broker

~250K messages / sec / broker

10 consumers

220MB / sec / broker

~480K messages / sec / broker

Test case 1 (Normal case):

Normal traffic as described above.



with tiered storagewithout tiered storage

Workload-1

(acks=all, low traffic)

Avg P99 produce latency25ms21ms
Avg P95 produce latency14ms13ms

Workload-2

(acks=1, high traffic)

Avg P99 produce latency9ms9ms
Avg P95 produce latency4ms4ms

We can see there is a little overhead when tiered storage is turned on. This is expected, as the brokers have to ship segments to remote storage, and sync the remote segment metadata between brokers. With at-least-once (acks=all) produce, the produce latency is slightly increased when tiered storage is turned on. With acks=1 produce, the produce latency is almost not changed when tiered storage is turned on.

Test case 2 (out-of-sync consumers catching up):

In addition to the normal traffic, 9 out-of-sync consumers consume 180MB/s per broker (or 900MB/s in total) old data.

With tiered storage, the old data is read from HDFS. Without tiered storage, the old data is read from local disk.



with tiered storagewithout tiered storage

Workload-1

(acks=all, low traffic)

Avg P99 produce latency42ms60ms
Avg P95 produce latency18ms30ms

Workload-2

(acks=1, high traffic)

Avg P99 produce latency10ms10ms
Avg P95 produce latency5ms4ms

Consuming old data has a significant performance impact to acks=all producers. Without tiered storage, the P99 produce latency is almost ~1.5 times. With tiered storage, the performance impact is relatively lower, because remote storage reading does not compete with the local hard disk bandwidth with produce requests.

Consuming old data has little impact to acks=1 producers.

Test case 3 (rebuild broker):

Under the normal traffic, stop a broker, remove all the local data, and rebuild it without replication throttling. This case simulates replacing a broken broker server.



with tiered storagewithout tiered storage

Workload-1

(acks=all,

12TB data per broker)

Max avg P99 produce latency56ms490ms
Max avg P95 produce latency23ms290ms
Duration2min230min

Workload-2

(acks=1,

34TB data per broker)

Max avg P99 produce latency12ms10ms
Max avg P95 produce latency6ms5ms
Duration4min520min

With tiered storage, the rebuilding broker only needs to fetch the latest data that has not been shipped to remote storage. Without tiered storage, the rebuilt broker has to fetch all the data that has not expired from the other brokers. With the same log retention time, tiered storage reduced the rebuilding time by more than 100 times.

Without tiered storage, the rebuilding broker has to read a large amount of data from the local hard disks of the leaders. This competes for page cache and local disk bandwidth with the normal traffic and dramatically increases the acks=all produce latency.

Future work

  • Enhance RLMM local file-based cache with RocksDB to avoid loading the whole cache inmemory. 
  • Enhance RLMM implementation based on topic based storage pointing to a target Kafka cluster instead of using a system level topic within the cluster.
  • Improve default RLMM implementation with a less chatty protocol.
  • Support disabling tiered storage for a topic. 
  • Add a system level config to enable tiered storage for all the topics in a cluster.
  • Recovery mechanism in case of the broker or cluster failure.
    • This is to be done by fetching the remote log metadata from RemoteStorageManager.
  • Recovering  from remote log metadata topic partitions truncation
  • Extract RPMM as a separate task and allow any RLMM implementation to reuse the task for deletion of remote segments and complete the remote partition deletion.

Alternatives considered

Following alternatives were considered:

  1. Replace all local storage with remote storage - Instead of using local storage on Kafka brokers, only remote storage is used for storing log segments and offset index files. While this has the benefits related to reducing the local storage, it has the problem of not leveraging the OS page cache and local disk for efficient latest reads as done in Kafka today. 
  2. Implement Kafka API on another store - This is an approach that is taken by some vendors where Kafka API is implemented on a different distributed, scalable storage (example HDFS). Such an option does not leverage Kafka other than API compliance and requires the much riskier option of replacing the entire Kafka cluster with another system.
  3. Client directly reads remote log segments from the remote storage - The log segments on the remote storage can be directly read by the client instead of serving it from Kafka broker. This reduces Kafka broker changes and has the benefits of removing an extra hop. However, this bypasses Kafka security completely, increases Kafka client library complexity and footprint, causes compatibility issues to the existing Kafka client libraries, and hence is not considered. 
  4. Store all remote segment metadata in remote storage. This approach works with the storage systems that provide strong consistent metadata, such as HDFS, but does not work with S3 and GCS. Frequently calling LIST API on S3 or GCS also incurs huge costs. So, we choose to store metadata in a Kafka topic in the default implementation but allow users to use other methods with their own RLMM implementations.
  5. Cache all remote log indexes in local storage. Store remote log segment information in local storage. 

    Image Added


Meeting Notes

(Notes by Kowshik)

  • Notes
    • Discussed the downgrade path, KIP will be updated with that.
    • Discussed the limitation of not allowing disable tiered storage on a topic.
    • All are agreed that KIP is ready for voting.

  • Notes
    • Discussed the latest review comments from the mail thread.
    • Manikumar will review and provide comments.

  • Notes
    • Satish discussed the edge cases around upgrade path with KIP-516 updates. Jun clarified on how topic-id is received after IBP is udpated on all brokers.
    • Jun suggested to update the KIP with more details on Remote Partition Remover.

    • RLMM flat file format was discussed and Jun asked to clarify the header section.

    • Kowshik and Jun will provide Log layer refactoring writeup.

  • Notes
    • Discussed producer snapshot fix missing in 2.7
    • Satish discussed memory growth due to RLMM cache and it looks to be practically negligible. The proposal is to use inmemory cache and checkpoint that to disk.
    • Satish will update the KIP with Upgrade path.
    • Kowshik and Jun will look into LOg refactoring.

  •  Discussion Recording
  • Notes

    1. Tiered storage upgrade path dicussion:

    • Details need to be documented in the KIP.
    • Current upgrade path plan is based on IBP bump.
    • Enabling of the remote log components may not mean all topics are eligible for tiering at the same time.
    • Should tiered storage be enabled on all brokers before enabling it on any brokers?
    • Is there any replication path dependency for enabling tiered storage?


    2. RLMM persistence format:

    • We agreed to document the persistence format for the materialized state of default RLMM implementation (topic-based).
    • (carry over from earlier discussion) For the file-based design, we don't know yet the % of increase in memory, assuming the majority of segments are in remote storage. It will be useful to document an estimation for this.


    3. Topic deletion lifecycle discussion:

    • Under topic deletion lifecycle, step (4) it would be useful to mention how the RemotePartitionRemover (RPRM) gets the list of segments to be deleted, and whether it has any dependency with the RLMM topic.


    4. Log layer discussion:

    • We discussed the complexities surrounding making code changes to Log layer (Log.scala).
    • Today, the Log class holds attributes and behavior related with local log. In the future, we would have to change the Log layer such that it would also contain the logic for the tiered portion of the log. This addition can pose a maintenance challenge.
    • Some of the existing attributes in the Log layer such as LeaderEpochCache and ProducerStateManager can be related with global view of the log too (i.e. global log is local log + tiered log). It can be therefore useful to think about preparatory refactoring, to see whether we can separate responsibilities related with the local log from the tiered log, and, perhaps provide a global view of the log that combines together both as and when required. The global view of the log could manage the lifecycle of LeaderEpochCache and ProducerStateManager.

    Follow-ups:
    • KIP-405 updates (upgrade path, RLMM file format and topic deletion)
    • Log layer changes


    (Notes taken by Kowshik)


  •  Discussion Recording
  • Notes
  • Satish discussed KIP-405 updates:

    1. Addressed some of the outstanding review comments from previous weeks.
    2. Remote log manager (RLM) cache configuration was added.
    3. Updated default values in the KIP for certain configuration parameters.
    4. RLMM committed offsets are stored in separate files.
    5. Initial version: go ahead with in-memory RLMM materializer implementation for now. Future switch to RocksDB seems feasible since it is an internal change only to RLMM cache.
    6. Yet to update the KIP with KIP-516 (topic ID) changes.
    7. Tiered storage upgrade path details are a work-in-progress. Will be added to the KIP.
    RLMM cache choice: RocksDB-based vs file-based:
    1. Harsha/Satish didn't see significant improvement in performance when they tried RocksDB in their prototype.
    2. Other advantages of RocksDB were discussed - snapshots, tooling, checksums etc.
    3. As for file-based design, we don't know yet the % of increase in memory, assuming the majority of segments are in remote storage.
    4. Currently a single file-based implementation for the whole broker is considered. But this may have some issues, so it can be useful to consider a file per partition.
    5. More details needed to be added to the KIP on file management, metadata operations, persisted data format and estimates on memory usage.
    Topic ID:
    1. KIP-516 PR may land by end of the year, so we should be able to use it in KIP-405.
    2. Satish to update KIP with details.
    (Notes taken by Kowshik)

Upgrade

//todo - add upgrade scenarios

Recovery mechanism incase of the broker or cluster failure

//todo - add possible solutions (maybe best effort scenarios)

Feature Test

Feature test cases and test results are documented in this google spreadsheet.

Performance Test Results

We have tested the performance of the initial implementation of this proposal.

The cluster configuration:

  1. 5 brokers
  2. 20 CPU cores, 256GB RAM (each broker)
  3. 2TB * 22 hard disks in RAID0 (each broker)
  4. Hardware RAID card with NV-memory write cache
  5. 20Gbps network
  6. snappy compression
  7. 6300 topic-partitions with 3 replicas
  8. remote storage uses HDFS

Each test case is tested under 2 types of workload (acks=all and acks=1)

...

Workload-1

(at-least-once, acks=all)

...

Workload-2

(acks=1)

...

10 producers

30MB / sec / broker (leader)

~62K messages / sec / broker (leader)

...

10 producers

55MB / sec / broker (leader)

~120K messages / sec / broker (leader)

...

10 consumers

120MB / sec / broker

~250K messages / sec / broker

...

10 consumers

220MB / sec / broker

~480K messages / sec / broker

Test case 1 (Normal case):

Normal traffic as described above.

...

Workload-1

(acks=all, low traffic)

...

Workload-2

(acks=1, high traffic)

...

We can see there is a little overhead when tiered storage is turned on. This is expected, as the brokers have to ship segments to remote storage, and sync the remote segment metadata between brokers. With at-least-once (acks=all) produce, the produce latency is slightly increased when tiered storage is turned on. With acks=1 produce, the produce latency is almost not changed when tiered storage is turned on.

Test case 2 (out-of-sync consumers catching up):

In addition to the normal traffic, 9 out-of-sync consumers consume 180MB/s per broker (or 900MB/s in total) old data.

With tiered storage, the old data is read from HDFS. Without tiered storage, the old data is read from local disk.

...

Workload-1

(acks=all, low traffic)

...

Workload-2

(acks=1, high traffic)

...

Consuming old data has a significant performance impact to acks=all producers. Without tiered storage, the P99 produce latency is almost tripled. With tiered storage, the performance impact is relatively lower, because remote storage reading does not compete the local hard disk bandwidth with produce requests.

Consuming old data has little impact to acks=1 producers.

Test case 3 (rebuild broker):

Under the normal traffic, stop a broker, remove all the local data, and rebuild it without replication throttling. This case simulates replacing a broken broker server.

...

Workload-1

(acks=all,

12TB data per broker)

...

Workload-2

(acks=1,

34TB data per broker)

...

With tiered storage, the rebuilding broker only needs to fetch the latest data that has not been shipped to remote storage. Without tiered storage, the rebuilt broker has to fetch all the data that has not expired from the other brokers. With the same log retention time, tiered storage reduced the rebuilding time by more than 100 times.

Without tiered storage, the rebuilding broker has to read a large amount of data from the local hard disks of the leaders. This competes page cache and local disk bandwidth with the normal traffic, and dramatically increases the acks=all produce latency.

Future work

  • Enhance RLMM local file-based cache with RocksDB to avoid loading the whole cache inmemory.  
  • Enhance RLMM implementation based on topic based storage pointing to a target Kafka cluster instead of using as system level topic within the cluster.  
  • Improve default RLMM implementation with a less chatty protocol.

Alternatives considered

Following alternatives were considered:

  1. Replace all local storage with remote storage - Instead of using local storage on Kafka brokers, only remote storage is used for storing log segments and offset index files. While this has the benefits related to reducing the local storage, it has the problem of not leveraging the OS page cache and local disk for efficient latest reads as done in Kafka today. 
  2. Implement Kafka API on another store - This is an approach that is taken by some vendors where Kafka API is implemented on a different distributed, scalable storage (example HDFS). Such an option does not leverage Kafka other than API compliance and requires the much riskier option of replacing the entire Kafka cluster with another system.
  3. Client directly reads remote log segments from the remote storage - The log segments on the remote storage can be directly read by the client instead of serving it from Kafka broker. This reduces Kafka broker changes and has benefits of removing an extra hop. However, this bypasses Kafka security completely, increases Kafka client library complexity and footprint, causes compatibility issues to the existing Kafka client libraries, and hence is not considered. 
  4. Store all remote segment metadata in remote storage. This approach works with the storage systems that provide strong consistent metadata, such as HDFS, but does not work with S3 and GCS. Frequently calling LIST API on S3 or GCS also incurs huge costs. So, we choose to store metadata in a Kafka topic in the default implementation, but allow users to use other methods with their own RLMM implementations.
  5. Cache all remote log indexes in local storage. Store remote log segment information in local storage. 

    Image Removed

...

  • Notes

  •  Discussion Recording
  • Notes
    •  Discussed that we can have producerid snapshot for each log segment which will be copied to remote storage. There is already a PR for KAFKA-9393 which addresses similar requirements.
    • Discussed on a case when the local data is not available on brokers, whether it is possible to recover the state from remote storage. 
    • We will update the KIP by early next week with
      • Topic deletion design proposed/discussed in the earlier meeting. This includes the schemas of remote log segment metadata events stored in the topic. 
      • Producerid snapshot for each segment discussion.
      • ListOffsets API version bump to support offset for the earliest local timestamp.
      • Justifying the rationale behind keeping RLMM and local leader epoch as the source of truth. 
      • Rocks DB instances as cache for remote log segment metadata.
      • Any other missing updates planned earlier. 

...

  • Discussion Recording
  • Notes
    • KIP is updated with follower fetch protocol and ready to reviewed
    • Satish to capture schema of internal metadata topic in the KIP
    • We will update the KIP with details of different cases
    • Test plan will be captured in a doc and will add to the KIP
    • Add a section "Limitations" to capture the capabilities that will be introduced with this KIP and what will not be covered in this KIP.

Other associated KIPs

KIP-852: Optimize calculation of size for log in remote tier

KIP-917: Additional custom metadata for remote log segment