Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

RLMM maintains metadata cache by subscribing to the respective remote log metadata topic partitions. Whenever a topic partition is reassigned to a new broker and RLMM on that broker is not subscribed to the respective remote log metadata topic partition then it will subscribe to the respective remote log metadata topic partition and adds all the entries to the cache. So, in the worst case, RLMM on a broker may be consuming from most of the remote log metadata topic partitions. In the initial version, we will have a file-based cache for all the messages that are already consumed by this instance and it will load inmemory whenever RLMM is started. This cache is maintained in a separate file for each of the topic partitions. This will allow us to commit offsets of the partitions that are already read. Committed offsets can be stored in a local file to avoid reading the messages again when a broker is restarted. We can improve this by having a RocksDB based cache to avoid a high memory footprint on a broker. 

RLMM segment overhead:

Topic partition's topic-id : uuid : 2 longs.

remoteLogSegmentId : uuid : 2 longs.

remoteLogSegmentMetadata : 5 longs + 1 int +1 byte + ~3 epochs(approx avg)

It has leader epochs in-memory which will be much less.

On avg: 10 longs : 10 * 8 = 80 *(other overhead 1.25) = 100 bytes

When a segment is rolled on a broker per sec.

retention as 30days : 60*60*24*30 ~ 2.6MM

2.6MM segments would take ~ 260MB. (This is 1% in our production env)

This overhead is not that significant as brokers may be using several GBs of memory. 

We can also have a lazy load approach by keeping only minimal in-memory entries like offset, epoch, uuid, and entry position in the file. When it is needed we can access it by using the entry position in the file. 

Message Format

RLMM instance on broker publishes the message to the topic with key as null and value with the below format.

...

RLMM stores the remote log metadata messages and builds materialized instances in a flat-file store for each user topic partition.

...


Code Block
titleflat_file_format
collapsetrue
<magic><topic-id><metadata-topic-offset><sequence-of-serialized-entries>

magic: version of this file format
topic-id: uuid of topic
metadata-topic-offset: offset of the remote log metadata topic from which this topic partition's remote log metadata is fetched.
serialized-entry: serialized entry defined as below, more types can be added later if needed. 

Serialization of entry is done as mentioned below. This is very similar to the message format mentioned earlier for storing into the metadata topic.  

length    : unsigned var int, length of this entry which is sum of sizes of type, version, and data.
type      : unsigned var int, represents the value type. This value is 'apikey' as mentioned in the schema. 
version   : unsigned var int, the 'version' number of the type as mentioned in the schema. 
data      : record payload in kafka protocol message format, the schema is given below.

Both type and version are added before the data is serialized into record value.  Schema can be evolved by adding a new version with the respective changes. A new type can also be supported by adding the respective type and its version.


{
  "apiKey": 0,
  "type": "data",
  "name": "RemoteLogSegmentMetadataRecordStored",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    {
      "name": "segmentId",
      "type": "uuid",
      "versions": "0+",
      "about": "Unique identifier of the log segment"
    },
    {
      "name": "StartOffset",
      "type": "int64",
      "versions": "0+",
      "about": "Start offset  of the segment."
    },
    {
      "name": "endOffset",
      "type": "int64",
      "versions": "0+",
      "about": "End offset  of the segment."
    },
    {
      "name": "LeaderEpoch",
      "type": "int32",
      "versions": "0+",
      "about": "Leader epoch from which this segment instance is created or updated"
    },
    {
      "name": "MaxTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Maximum timestamp with in this segment."
    },
    {
      "name": "EventTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Event timestamp of this segment."
    },
    {
      "name": "SegmentLeaderEpochs",
      "type": "[]SegmentLeaderEpochEntry",
      "versions": "0+",
      "about": "Event timestamp of this segment.",
      "fields": [
        {
          "name": "LeaderEpoch",
          "type": "int32",
          "versions": "0+",
          "about": "Leader epoch"
        },
        {
          "name": "Offset",
          "type": "int64",
          "versions": "0+",
          "about": "Start offset for the leader epoch"
        }
      ]
    },
    {
      "name": "SegmentSizeInBytes",
      "type": "int32",
      "versions": "0+",
      "about": "Segment size in bytes"
    },
    {
      "name": "RemoteLogState",
      "type": "int8",
      "versions": "0+",
      "about": "State of the segment"
    }
  ]
}


{
  "apiKey": 1,
  "type": "data",
  "name": "DeletePartitionStateRecord",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    {
      "name": "epoch",
      "type": "int32",
      "versions": "0+",
      "about": "Epoch (controller or leader) from which this event is created. DELETE_PARTITION_MARKED is sent by the controller. DELETE_PARTITION_STARTED and DELETE_PARTITION_FINISHED are sent by remote log metadata topic partition leader."
    },
    {
      "name": "EventTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Event timestamp of this segment."
    },
    {
      "name": "RemotePartitionState",
      "type": "int8",
      "versions": "0+",
      "about": "State of the remote partition"
    }
  ]
}

...