Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
                      earlier offsets
                          +---+
     visible change ----> |   |
                          +---+       last stable offset
     visible                     change ----> |   | <---- standby controllers'
                          +---+       in-memory state
     pending change ----> |   |
                          +---+
     pending change ----> |   |
                          +---+
     pending change ----> |   |
                          +---+
     pending                     change ----> |   | <---- active controller's
                          +---+       in-memory state
                      latest offset

...

The active controller makes change changes to the metadata by appending delta records to the log.   A delta Each record is a sequence of bytes which serves as the concrete representation of a metadata change.

Like RPCs, delta records have specific types.  The type of the delta record determines what information it contains.  For example, deltas of type IsrChangeDelta contain ISR changes for a specific partition.

As time goes on, the number of deltas will grow and grow, even if the total size of the metadata stays constant.  Therefore, periodically, we need to consolidate all the metadata deltas into a KIP-630 snapshot.

The snapshot will contain records as well.  However, these records will not be deltas.  Instead, they will fully describe the state of entities like topics, partitions, and so on.  For example, PartitionSnapshot will contain information about a specific partition.

KIP-500 records will have null keys, and values containing the following:

  1. a signed varint specifying the type.
  2. an unsigned varint specifying the version
  3. the payload in Kafka RPC format

Delta records will have non-negative types.  Snapshot records will start at -1 and have negative types.

Record Format Versions

There are two ways to evolve the format of a KIP-500 record.  One is to add KIP-482 optional tagged fields.  These will be ignored by older software, but can contain additional data for new software to handle.  The other choice is to bump the version of the record.

In the pre-KIP-500 world, we had the inter-broker protocol (IBP) setting to control what RPC versions the controller used to communicate with the brokers.  This allowed us to evolve the RPC format over time.  We also used it to gate many other features, and even metadata format changes.  In the post-KIP-500 world, the analogous setting is the metadata.format KIP-584 feature flag. This setting controls the snapshot and delta formats which the controller will use.

...

has a null key, and this format for its value:

  1. an unsigned varint specifying the record type.
  2. an unsigned varint specifying the record version
  3. the payload in Kafka RPC format

For example, if we wanted to encode a TopicRecord, we might have 1 encoded as a varint, followed by 0 as the version, followed by the serialized topic data.

The record type and version will typically only take one byte each, for a total header size of two bytes.

Record Format Versions

There are two ways to evolve the format of a KIP-500 record.  One is to add KIP-482 optional tagged fields.  These will be ignored by older software, but can contain additional data for new software to handle.  The other choice is to bump the version of the record.

In the pre-KIP-500 world, we had the inter-broker protocol (IBP) setting to control what RPC versions the controller used to communicate with the brokers.  This allowed us to evolve the inter-broker RPC format over time.  We also used it to gate many other features, such as metadata format changes.  In the post-KIP-500 world, the analogous setting is the metadata.format KIP-584 feature flag. This setting controls the snapshot and delta formats which the controller will use.

Snapshot Implementation

As time goes on, the number of records will grow and grow, even if the total size of the metadata stays constant.  Therefore, periodically, we need to consolidate all the metadata deltas into a snapshot.

Like the metadata log, the snapshot is made up of records.  However, unlike the log, in which there may be multiple records describing a single entity, the snapshot will only contain the minimum number of records needed to describe all the entities.

As described in KIP-630, snapshots are local to each replica.  For example, replica A may have a snapshot at offset 100, and deltas up to offset 150, whereas replica B may have a snapshot at 125 and deltas up to offset 150.  Any snapshot must be usable as a starting point for loading the entire state of metadata.  In other words, a new controller node must be able to load the a snapshot, and then apply all the edits which follow it, and come up-to-date.

The currently active controller will monitor the offset of the latest snapshot made by all replicas, including itself.  The snapshotting state of each node is considered soft state: it is not persisted anywhere in the log, but purely communicated by heartbeats and stored in memory by the active controller.

When the active controller decides that a remote node standby controller should start a snapshot, it will communicate that information in its response to the periodic heartbeat sent by that node.  When the active controller decides that it itself should create a snapshot, it will first try to give up the leadership of the Raft quorum.

Because the snapshots are centrally coordinated by the active controller, we can avoid initiating more than one snapshot at once.  The active controller will not tell a node to begin snapshotting if it is aware that another node is also snapshotting or otherwise unavailable.The controller will also snapshot less frequently when controller will also snapshot less frequently when too many members of the quorum have fallen behind.  Specifically, if losing a node would probably impact availability, we will use a separate set of configurations for determining when to snapshot.

...

The controller will return NOT_CONTROLLER if it is not active.  Brokers will always return NOT_CONTROLLER for these RPCs.

Snapshot Format

Delta Format

Metrics

Compatibility, Deprecation, and Migration Plan

...

Record Formats

BrokerRecord

Code Block
{
  "apiKey": 0,
  "type": "metadataRecord",
  "name": "BrokerRecord",
  "validVersions": "0",
  "fields": [
	{ "name": "BrokerId", "type": "int32", "versions": "0+",
	  "about": "The broker id." },
	{ "name": "BrokerEpoch", "type": "int64", "versions": "0+",
	  "about": "The broker epoch." },
	{ "name": "EndPoints", "type": "[]BrokerEndpoint", "versions": "0+",
	  "about": "The endpoints that can be used to communicate with this broker.", "fields": [
		{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
		  "about": "The name of the endpoint." },
		{ "name": "Host", "type": "string", "versions": "0+",
		  "about": "The hostname." },
		{ "name": "Port", "type": "int16", "versions": "0+",
		  "about": "The port." },
		{ "name": "SecurityProtocol", "type": "int16", "versions": "0+",
		  "about": "The security protocol." }
	]},
	{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
	  "about": "The broker rack." }
  ]
}

TopicRecord

Code Block
{
  "apiKey": 1,
  "type": "metadataRecord",
  "name": "TopicRecord",
  "validVersions": "0",
  "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
          "about": "The topic name." },
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The unique ID of this topic." },
        { "name": "Deleting", "type": "boolean", "versions": "0+",
          "about": "True if this topic is in the process of being deleted." }
  ]
}

PartitionRecord

Code Block
{
  "apiKey": 2,
  "type": "metadataRecord",
  "name": "PartitionRecord",
  "validVersions": "0",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Replicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas of this partition, sorted by preferred order." },
    { "name": "Isr", "type":  "[]int32", "versions":  "0+",
      "about": "The in-sync replicas of this partition" },
    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas that we are in the process of removing." },
    { "name": "AddingReplicas", "type":  "[]int32", "versions":  "0+",
      "about": "The replicas that we are in the process of adding." },
    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The lead replica, or -1 if there is no leader." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "An epoch that gets incremented each time we change the ISR." }
  ]
}

ConfigRecord

Code Block
{         
  "apiKey": 3,
  "type": "metadataRecord",
  "name": "ConfigRecord",
  "validVersions": "0",
  "fields": [
    { "name": "ResourceType", "type": "int8", "versions": "0+",
      "about": "The type of resource this configuration applies to." },
    { "name": "ResourceName", "type": "string", "versions": "0+",
      "about": "The name of the resource this configuration applies to." },         
    { "name": "Name", "type": "string", "versions": "0+",
      "about": "The name of the configuration key." },                  
    { "name": "Value", "type": "string", "versions": "0+",     
      "about": "The value of the configuration." }
  ]           
} 

IsrChange

Code Block
{
  "apiKey": 4,
  "type": "metadataRecord",
  "name": "IsrChangeRecord",
  "validVersions": "0",
  "fields": [
    { "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The partition id." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "about": "The unique ID of this topic." },
    { "name": "Isr", "type":  "[]int32", "versions":  "0+",
      "about": "The in-sync replicas of this partition" },
    { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The lead replica, or -1 if there is no leader." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "An epoch that gets incremented each time we change the ISR." }
  ]
} 

New Metrics

Full NameDescription

kafka.controller:type=KafkaController,name=MetadataLag

The offset delta between the latest metadata record this controller has replayed and the last stable offset of the metadata topic.

kafka.controller:type=KafkaServer,name=MetadataLagThe offset delta between the latest metadata record this broker has replayed and the last stable offset of the metadata topic.
kafka.controller:type=KafkaController,name=MetadataCommitLatencyMsThe latency of committing a message to the metadata topic.  Relevant on the active controller.
kafka.controller:type=KafkaController,name=MetadataCommitRateThe number of metadata messages per second committed to the metadata topic.
kafka.controller:type=KafkaController,name=MetadataSnapshotLagThe offset delta between the latest stable offset of the metadata topic and the offset of the last snapshot (or 0 if there are no snapshots)
kafka.controller:type=KafkaController,name=ControllerRequestsRateThe number of controller requests per second processed.

Unused Metrics in KIP-500 Mode

We will deprecate these metrics as soon as legacy mode is deprecated.  For now, they will be unused in KIP-500 mode.

Full NameDescription

kafka.server:type=SessionExpireListener,name=ZooKeeperExpiresPerSec

No longer needed when running in KIP-500 mode because we won't have any ZK sessions

Compatibility, Deprecation, and Migration Plan

As described above, this KIP outlines a new mode that the broker can run in, KIP-500 mode.  For now, this mode will be experimental, and there will be no way to migrate existing clusters from legacy mode to KIP-500 mode.  We plan on outlining how this upgrade process will work in a follow-on KIP.  We do plan on deprecating legacy mode eventually, but we are not quite ready to do it yet in this KIP.

Rejected Alternatives

Support Automatic Broker ID Assignment

...