Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this design, RemoteLogMetadataManager(RLMM) is responsible for storing and fetching remote log segment metadata. It provides

  • Storing remote log segment metadata for a partition based on offsets's log segment
  • Fetching remote log segment metadata for an offset and leader epoch.
  • Register a topic partition to build cache for remote log segment metadata by reading from remote log segment metadata topic

RemoteLogMetadataManager(RLMM) mainly has the below components

...

RLMM maintains metadata cache by subscribing to the respective remote log metadata topic partitions. Whenever a topic partition is reassigned to a new broker and RLMM on that broker is not subscribed to the respective remote log metadata topic partition then it will subscribe to the respective remote log metadata topic partition and adds all the entries to the cache. So, in the worst case, RLMM on a broker may be consuming from most of the remote log metadata topic partitions. This requires the cache to be based on disk storage like RocksDB to avoid a high memory footprint on a broker. This will allow us to commit offsets of the partitions that are already read. Committed offsets can be stored in a local file to avoid reading the messages again when a broker is restarted.

Message Format

RLMM instance on broker publishes the message to the topic with key as null and value with the below format.

type      : unsigned var int, represents the value type. This value is 'apikey' as mentioned in the schema. 
version : unsigned var int, the 'version' number of the type as mentioned in the schema. 
data      : record payload in kafka protocol message format, the schema is given below.

Schema can be evolved by adding a new version with the respective changes. A new type can also be supported by adding with the respective type and its version. 

Code Block
titleSchema
{
  "apiKey": 0,
  "type": "data",
  "name": "RemoteLogSegmentMetadataRecord",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    {
      "name": "RemoteLogSegmentId",
      "type": "RemoteLogSegmentIdEntry",
      "versions": "0+",
      "about": "The transactional id corresponding to the transaction.",
      "fields": [
        {
          "name": "topic",
          "type": "string",
          "versions": "0+",
          "about": "Topic name"
        },
        {
          "name": "partition",
          "type": "int32",
          "versions": "0+",
          "about": "Partition number"
        },
        {
          "name": "id",
          "type": "uuid",
          "versions": "0+",
          "about": "unique identifier"
        }
      ]
    },
    {
      "name": "StartOffset",
      "type": "int64",
      "versions": "0+",
      "about": "Start offset  of the segment."
    },
    {
      "name": "endOffset",
      "type": "int64",
      "versions": "0+",
      "about": "End offset  of the segment."
    },
    {
      "name": "LeaderEpoch",
      "type": "int32",
      "versions": "0+",
      "about": "Leader epoch from which this segment instance is created or updated"
    },
    {
      "name": "MaxTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Maximum timestamp with in this segment."
    },
    {
      "name": "EventTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Event timestamp of this segment."
    },
    {
      "name": "SegmentLeaderEpochs",
      "type": "[]SegmentLeaderEpochEntry",
      "versions": "0+",
      "about": "Event timestamp of this segment.",
      "fields": [
        {
          "name": "LeaderEpoch",
          "type": "int32",
          "versions": "0+",
          "about": "Leader epoch"
        },
        {
          "name": "Offset",
          "type": "int64",
          "versions": "0+",
          "about": "Start offset for the leader epoch"
        }
      ]
    },
    {
      "name": "SegmentSizeInBytes",
      "type": "int64",
      "versions": "0+",
      "about": "Segment size in bytes"
    },
    {
      "name": "State",
      "type": "int8",
      "versions": "0+",
      "about": "State of the segment"
    }
  ]
}


Configs

remote.log.metadata.topic.replication.factor


Replication factor of the topic

Default: 3

remote.log.metadata.topic.partitions

No of partitions of the topic

Default: 50

remote.log.metadata.topic.retention.ms

Retention of the topic in milli seconds

Default: 365 * 24 * 60 * 60 * 1000  (1 yr)

remote.log.metadata.manager.listener.name

Listener name to be  used to connect to the local broker by RemoteLogMetadataManager implementation on the broker. Respective endpoint address is passed with  "bootstrap.servers" property while invoking RemoteLogMetadataManager#configure(Map<String, ?> props). 

This is used by kafka clients created in RemoteLogMetadataManager implementation.

remote.log.metadata.*

Any other properties should be prefixed with "remote.log.metadata." and these will be passed to RemoteLogMetadataManager#configure(Map<String, ?> props).

For ex: Security configuration to connect to the local broker for the listener name configured are passed with props.

...