Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: authorization for new rpc

Table of Contents

Status

Current state:  Under DiscussionAccepted

Discussion thread: https://lists.apache.org/thread/8dqvfhzcyy87zyy12837pxx9lgsdhvft 

Vote thread: https://lists.apache.org/thread/4pqjp8r7n94lnymv3xc689mfw33lz3mj

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-14127

...

The format sub-command in the the kafka-storage.sh tool already supports formatting more than one log directory — by expecting a list of configured log.dirs  —  and "formatting" only the ones that need so. When multiple log.dirs are configured, a new A new property will be included in meta.properties — directory.id — which will identify each log directory with a UUID. The UUID is randomly generated for each log directory.

meta.properties

When multiple log.dirs are configured, a A new property — directory.id — will be expected in the meta.properties file in each log directory configured under log.dirsThe property indicates the UUID for the log directory where the file is located. If any of the meta.properties files does not contain directory.id one will be randomly generated and the file will be updated upon Broker startup. The kafka-storage.sh tool will be extended to generate this property as described in the previous section.

If log directory that holds the cluster metadata topic is configured separately to a different path — using metadata.log.dir — then this log directory does not get a UUID assigned. 

Footnote

The broker cannot run if this particular log directory is unavailable, and when configured separately it cannot host any user partitions, so there's no point in identifying it in the Controller.

Reserved UUIDs

The following UUIDs are excluded from the random pool when generating a log directory UUID:

Reserved UUIDs

The following UUIDs are excluded from the random pool when generating a log directory UUID:

  • UUID.UNASSIGNED_DIRUUID.UnknownDir – new Uuid(0L, 0L) – used to identify new or unknown assignments.
  • UUID.OfflineDirLOST_DIR - new Uuid(0L, 1L) – used to represent unspecified offline directories.

Metadata records

  • UUID.MIGRATING_DIR - new Uuid(0L, 2L) – used when transitioning from a previous state where directory assignment was not available, to designate that some directory was previously selected to host a partition, but we're not sure which one yet.

The first 100 UUIDs, minus the three listed above are also reserved for future use.

Metadata records

RegisterBrokerRecord and BrokerRegistrationChangeRecord will have a new fieldRegisterBrokerRecord and BrokerRegistrationChangeRecord will both have two new fields:

{ "name": "OnlineLogDirsLogDirs", "type":  "[]uuid", "versions":  "3+", "taggedVersions": "3+", "tag": "0",
"about": "Log directories configured in this broker which are available." },

PartitionRecord and PartitionChangeRecord will both have a new Directories field

{ { "name": "OfflineLogDirsDirectories", "type":  "bool[]uuid", "versions":  "31+",
"about": "Whether any log directories configuredThe log directory hosting each replica, sorted in thisthe brokersame areexact notorder available." }

PartitionRecord and PartitionChangeRecord will both have a new Assignment field which replaces the current Replicas field:

as the Replicas field."}

Although not explicitly specified in the schema, the default value for Directory is Uuid.UNASSIGNED_DIR (Uuid.ZERO), as that's the default default value for UUID types.

Footnote

Yes, double default, not a typo. The default setting, for the default value of the field.

A directory assignment to Uuid.UNASSIGNED_DIR conveys that the log directory is not yet known, the hosting Broker will eventually determine the hosting log directory and use AssignReplicasToDirs to update this the assignment.

RPC requests

BrokerRegistrationRequest will include the following new field:

{ "name": "Replicas", "type":  "[]int32", "versions":  "0", "entityType": "brokerId",
"about": "The replicas of this partition, sorted by preferred order." },
(...)
{ "name": "Assignment", "type": "[]ReplicaAssignment", "versions": "1+",
"about": "The replicas of this partition, sorted by preferred order.", "fields": [
{ "name": "BrokerLogDirs", "type": "int32[]uuid", "versions": "12+", "entityType": "brokerId",
"about": "TheLog brokerdirectories ID hosting the replicaconfigured in this broker which are available." },

BrokerHeartbeatRequest will include the following new field:

  { "name": "DirectoryOfflineLogDirs", "type":  "[]uuid", "versions": "1+", "taggedVersions": "1+1", "tag": "0",
"about": "TheLog directories logthat directoryfailed hostingand thewent replicaoffline." }]}

The new Directory field is a tagged field, which won't be used when a single log directory is registered, avoiding the need to grow the metadata record size for non JBOD-configured clusters.

Although not explicitly specified in the schema, the default value for Directory is Uuid.UnknownDir, as that's the default default value for UUID types.

Footnote

Yes, double default, not a typo. The default setting, for the default value of the field.

A directory assignment to Uuid.UnknownDir conveys that the log directory is not yet known, the hosting Broker will eventually determine the hosting log directory and use AssignReplicasToDirs to update this the assignment.

RPC requests

BrokerRegistrationRequest will include the following two new fields:

A new RPC named AssignReplicasToDirs will be introduced with the following request and response:

{
"apiKey": <TBD>,
"type": "request",
"listeners": ["controller],
"name": "AssignReplicasToDirsRequest",
"validVersions": "0",
"flexibleVersions": "0{ "name": "OnlineLogDirs", "type": "[]uuid", "versions": "2+",
"aboutfields": "Log directories configured in this broker which are available." },
[
{ "name": "OfflineLogDirsBrokerId", "type": "boolint32", "versions": "20+",
"aboutentityType": "Whether any log directories configured in this broker are not available." }

BrokerHeartbeatRequest will include the following new field:

brokerId",
"about": "The ID of the requesting broker" },
{ "name": "OfflineLogDirsBrokerEpoch", "type": "[]uuidint64", "versions": "10+", "taggedVersionsdefault": "-1+", "tag": "0",

"about": "LogThe directoriesepoch thatof failedthe and went offline.requesting broker" }

A new RPC named AssignReplicasToDirs will be introduced with the following request and response:

{
"apiKey": <TBD>,
"type,
{ "name": "requestDirectories",
"listenerstype": "[]DirectoryData"controller],
"nameversions": "AssignReplicasToDirsRequest0+",
"validVersionsfields": "0",
"flexibleVersions": "0+", [
"fields": [
{ "name": "BrokerIdId", "type": "int32uuid", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the requesting brokerdirectory" },
{ "name": "BrokerEpochTopics", "type": "int64[]TopicData", "versions": "0+", "defaultfields": "-1",[
{ "aboutname": "The epoch TopicName", "type": "uuid", "versions": "0+",
"about": "The name of the requestingassigned brokertopic" },
{ "name": "DirectoriesPartitions", "type": "[]DirectoryDataPartitionData", "versions": "0+", "fields": [
{ "name": "IdPartitionIndex", "type": "uuidint32", "versions": "0+",
"about": "The ID of the directorypartition index" },
{ ]}
]}
]}
]
}
{
"apiKey": <TBD>,
"name": "Topics", "type": "response",
"[]TopicData", "versions "name": "AssignReplicasToDirsResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "NameThrottleTimeMs", "type": "stringint32", "versions": "0+",
"entityType": "topicName",
"about": "The name of the assigned topic "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "PartitionsErrorCode", "type": "[]PartitionDataint16", "versions": "0+",
"fieldsabout": [
"The top level response error code" },
{ "name": "PartitionIndexDirectories", "type": "int32[]DirectoryData", "versions": "0+",
"fields": [
{ "name": "aboutId", "type": "The partition index" }
]}
]}uuid", "versions": "0+", "about": "The ID of the directory" },
]}
]
}{
"apiKeyname": <TBD>"Topics",
"type": "response[]TopicData",
"nameversions": "AssignReplicasToDirsResponse0+",
"validVersionsfields": "0",[
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMsTopicId", "type": "int32uuid", "versions": "0+",
"about": "The durationname in milliseconds for which of the requestassigned was throttled due to a quota violation, or zero if the request did not violate any quota." },topic" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "ErrorCodePartitionIndex", "type": "int16int32", "versions": "0+",
"about": "The top level response error codepartition index" },
{ "name": "DirectoriesErrorCode", "type": "[]DirectoryDataint16", "versions": "0+",
"fields": [
{ "name": "Idabout",: "type": "uuid", "versions": "0+", "about": "The IDpartition oflevel theerror directorycode" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [ ]}
]}
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The name of the assigned topic" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The partition level error code" }
]}
]}
]}
]
}

A AssignReplicasToDirs request including an assignment to Uuid.OfflineDir conveys that the Broker is wanting to correct a replica assignment into a offline log directory, which cannot be identified.

Proposed changes

Metrics

...

The number of replicas hosted by the broker that are either missing a log directory assignment in the cluster metadata or are currently found in a different log directory and are queued to be sent to the controller in a AssignReplicasToDirs request.

]}
]
}

A AssignReplicasToDirs request including an assignment to Uuid.LOST_DIR conveys that the Broker is wanting to correct a replica assignment into a offline log directory, which cannot be identified.

This request is authorized with CLUSTER_ACTION on CLUSTER.

Proposed changes

Metrics

MBean nameDescription
kafka.server:type=KafkaServer,name=QueuedReplicaToDirAssignments

The number of replicas hosted by the broker that are either missing a log directory assignment in the cluster metadata or are currently found in a different log directory and are queued to be sent to the controller in a AssignReplicasToDirs request.

Configuration

The following configuration option is introduced

Name

Description

Default

Valid Values

Priority

log.dir.failure.timeout.ms

If the broker is unable to successfully communicate to the controller that some log directory has failed for longer than this time, and there's at least one partition with leadership on that directory, the broker will fail and shut down.

30000 (30 seconds)

[1, …]

low


Storage format command

The format subcommand will be updated to ensure each log directory has an assigned UUID and it will persist a new property directory.id in the meta.properties  file. The value is base64 encoded, like the cluster UUID.

The meta.properties  version field will stay set to 1, to allow for a downgrade after an upgrade on a non JBOD KRaft cluster.

Footnote

If an existing, non JBOD KRaft cluster is upgraded to the first version that includes the changes described in this KIP, which write these new fields, and is later downgraded, the meta.properties file needs to still be readable. There's currently a hard check on the version number which would fail for a new version number. 

The UUIDs for each log directory are automatically generated by the tool if there isn't one assigned already in an existing meta.properties  file.

Having a persisted UUID at the root of each log directory allows the broker to identify the log directory regardless of the mount path.

Example

Given the following server.properties:

(... other non interesting properties omitted ...)
process.roles=broker
node.id=8
metadata.log.dir=/var/lib/kafka/metadata
log.dirs=/mnt/d1,/mnt/d2

The command ./bin/kafka-storage.sh format -c /tmp/server.properties --cluster-id 41QSStLtR3qOekbX4ZlbHA  would generate three meta.properties  files that could look like the following:

/var/lib/kafka/metadata/meta.properties :

#
#Thu Aug 18 15:23:07 BST 2022
node.id=8
version=1
cluster.id=41QSStLtR3qOekbX4ZlbHA
directory.id=e6umYSUsQyq7jUUzL9iXMQ
/mnt/d1/meta.properties :
#
#Thu Aug 18 15:23:07 BST 2022
node.id=8
version=1
cluster.id=41QSStLtR3qOekbX4ZlbHA
directory.id=b4d9ExdORgaQq38CyHwWTA
/mnt/d2/meta.properties :
#
#Thu Aug 18 15:23:07 BST 2022
node.id=8
version=1
cluster.id=41QSStLtR3qOekbX4ZlbHA
directory.id=P2aL9r4sSqqyt7bC0uierg

Each directory, including the directory that holds the cluster metadata topic — metadata.log.dir  — has a different and respective value as the directory ID.

In the example above, we can identify the following directory mapping:

  • /var/lib/kafka/metadata  has log directory UUID e6umYSUsQyq7jUUzL9iXMQ 
  • /mnt/d1  has log directory UUID b4d9ExdORgaQq38CyHwWTA 
  • /mnt/d2 has log directory UUID P2aL9r4sSqqyt7bC0uierg 

Brokers

Broker lifecycle management

When the broker starts up and initializes LogManager, it will load the UUID for each log directory (directory.id ) by reading the meta.properties file at the root of each of them.

  • If there are any two log directories with the same UUID, the Broker will fail at startup
  • If there are any meta.properties files missing directory.id, a new UUID is generated, and assigned to that directory by updating the meta.properties file.

The set of all loaded log directory UUIDs is sent along in the broker registration request to the controller as the LogDirs field. 

Metadata caching

Currently, Replicas are considered offline if the hosting broker is offline. Additionally, replicas will also be considered offline if the replica references a log directory UUID (in the new field partitionRecord.Directories) that is not present in the hosting Broker's latest registration under LogDirs and either:

  • the log directory UUID is UUID.LOST_DIR
  • the hosting broker's registration indicates multiple online log directories. i.e. brokerRegistration.LogDirs.length > 1

If neither of the above conditions are true, we assume that there is only one log directory configured, the broker is not configured with multiple log directories, replicas all live in the same directory and neither log directory assignments nor log directory failures shall be communicated to the Controller. 

Handling log directory failures

When multiple log directories are configured, and some (but not all) of them become offline, the broker will communicate this change using the new field OfflineLogDirs  in the BrokerHeartbeat  request — indicating the UUIDs of the new offline log directories. The UUIDs for the accumulated failed log directories are included in every BrokerHeartbeat request until the broker restarts. If the Broker is configured with a single log directory, this field isn't used, as the current behavior of the broker is to shutdown when no log directories are online.

Log directory failure notifications are queued and batched together in all future broker heartbeat requests.

If the Broker repeatedly fails to communicate a log directory failure, or a replica assignment into a failed directory, after a configurable amount of time — log.dir.failure.timeout.ms — and it is the leader for any replicas in the failed log directory the broker will shutdown, as that is the only other way to guarantee that the controller will elect a new leader for those partitions.

Replica management

When configured with multiple log.dirs, as the broker catches up with metadata, and sees the partitions which it should be hosting, it will check the associated log directory UUID for each partition (partitionRecord.Directories).

  • If the partition is not assigned to a log directory (refers to Uuid.UNASSIGNED_DIR)
    • If the partition already exists, the broker uses the new RPC — AssignReplicasToDirs — to notify the controller to change the metadata assignment to the actual log directory.
    • If the partition does not exist, the broker selects a log directory and uses the new RPC — AssignReplicasToDirs — to notify the controller to create the metadata assignment to the actual log directory.
  • If the partition is assigned to an online log directory
    • If the partition does not exist it is created in the indicated log directory.
    • If the partition already exists in the indicated log directory and no future replica exists, then no action is taken.
    • If the partition already exists in the indicated log directory, and there is a future replica in another log directory, then the broker starts the process to replicate the current replica to the future replica.
    • If the partition already exists in another online log directory and is a future replica in the log directory indicated by the metadata, the broker will replace the current replica with the future replica after making sure that the future replica is fully caught up with the current replica.
    • If the partition already exists in another online log directory, the broker uses the new RPC — AssignReplicasToDirs — to the controller to change the metadata assignment to the actual log directory. The partition might have been moved to a different log directory whilst the broker was offline. 
  • If the partition is assigned to an unknown log directory or refers to Uuid.LOST_DIR
    • If there are offline log directories, no action is taken — the assignment refers to a a log directory which may be offline, we don't want to fill the remaining online log directories with replicas that existed in the offline ones.
    • If there are no offline directories, the broker selects a log directory and uses the new RPC — AssignReplicasToDirs — to notify the controller to create the metadata assignment to the actual log directory.

If instead, a single entry is configured under log.dirs or log.dir, then the AssignReplicasToDirs RPC is only sent to correct assignments to UUID.LOST_DIR, as described above.

If the broker is configured with multiple log directories it remains FENCED until it can verify that all partitions are assigned to the correct log directories in the cluster metadata. This excludes the log directory that hosts the cluster metadata topic, if it is configured separately to a different path — using metadata.log.dir.

Assignments to be sent via AssignReplicasToDirs are queued and batched together, handled by a log directory event manager that also handles log directory failure notifications.

Intra-broker replica movement

Support for replica movement between directories was introduced in KIP-113. This functionality is maintained, but altered slightly so that the controller remains correctly informed of the log directory for any moving replica.

The existing AlterReplicaLogDirs RPC is sent directly to the broker in question, which starts moving the replicas using ReplicaAlterLogDirsThread – this remains unchanged. But when the future replica first catches up with the main replica, instead of immediately promoting the future replica, the broker will:

  1. Asynchronously communicate the log directory change to the controller using the new RPC – AssignReplicasToDirs.
  2. Keep the ReplicaAlterLogDirsThread going. The future replica is still the future replica, and it continues to copy from the main replica – which still in the original log directory – as new records are appended.

Once the broker receives confirmation of the metadata change – indicated by a successful response to AssignReplicasToDirs – then it will:

  1. Block appends to the main (old) replica and waits for the future replica to fully catch up once again.
  2. Makes the switch, promoting the future replica to main replica and cleaning up the old one.

By delaying the metadata change until the future replica has caught up we minimize the chance of a log directory failure happening with an incorrect replica to log directory assignment in the metadata.

The diagram below illustrates the sequence of steps involved in moving a replica between log directories.

Image Added


In the diagram above, notice that if dir1 fails after the AssignReplicasToDirs RPC is sent, but before the future replica is promoted, then the controller will not know to update leadership and ISR for the partition. If the destination directory has failed, it won't be possible to promote the future replica, and the Broker needs to revert the assignment (cancelled locally if still queued). If the source directory has failed, then the future replica might not catch up, and the Controller might not update leadership and ISR for the partition. In this exceptional case, the broker issues a AssignReplicasToDirs RPC to the Controller to assignment the replica to UUID.LOST_DIR - this lets the Controller know that it needs to update leadership and ISR for this partition too.

Controller

Replica placement

For any new partitions, the active controller will use Uuid.UNASSIGNED_DIR as the initial value for log directory UUID for each replica – this is the default (empty) value for the tagged field. Each broker with multiple log.dirs hosting replicas then assigns a log directory UUID and communicates it back to the active controller using the new RPC AssignReplicasToDirs so that cluster metadata can be updated with the log directory assignment. Brokers that are configured with a single log directory to not send this RPC.

Handling log directory failures

When a controller receives a BrokerHeartbeat request from a broker that indicates any UUIDs under the new OfflineLogDirs field, it will:

  • Persist a BrokerRegistrationChange record, with the new list of online log directories.
  • Update the Leader and ISR for all the replicas assigned to the failed log directories, persisting PartitionChangeRecords, in a similar way to how leadership and ISR is updated when a broker becomes fenced, unregistered or shuts down.

If the any of the listed log directory UUIDs is not a registered log directory then the call fails with error 57 — LOG_DIR_NOT_FOUND.

Handling replica assignments

The controller accepts the AssignReplicasToDirs RPC and persists the assignment into metadata records.

If the indicated log directory UUID is not one of the Broker's online log directories, then the replica is considered offline and the leader and ISR is updated accordingly, same as when the BrokerHeartbeat indicates a new offline log directory.

Broker registration

Upon a broker registration request the controller will persist the broker registration as cluster metadata including the online log directory list and offline log directories flag for that broker. The controller may receive a new list of online directories and offline log directories flag — different from what was previously persisted in the cluster metadata for the requesting broker.

  • If there are no indicated online log directory UUIDs the request is invalid and the controller replies with an error 42 – INVALID_REQUEST.
  • If multiple log directories are registered the broker will remain fenced until the controller learns of all the partition to log directory placements in that broker - i.e. no remaining replicas assigned to Uuid.UNASSIGNED_DIR . The broker will indicate these using the AssignReplicasToDirs RPC.

    • The broker remains fenced by not wanting to unfence itself in heartbeat requests until the number of mismatching replica to log directory assignments is zero. This number is represented by the new metric QueuedReplicaToDirAssignments.
  • If multiple log directories are registered and some of them are new (not present in previous registration) then these log directories are assumed to be empty. If they are not, the broker will use the AssignReplicasToDirs RPC to correct assignment and choose not to become UNFENCED before the metadata is correct

Configuration

The following configuration option is introduced

...

Name

...

Description

...

Default

...

Valid Values

...

log.dir.failure.timeout.ms

...

If the broker is unable to successfully communicate to the controller that some log directory has failed for longer than this time, the broker will fail and shut down.

...

30000 (30 seconds)

...

[1, …]

...

Storage format command

The format subcommand will be updated to ensure each log directory has an assigned UUID and it will persist a new property directory.id in the meta.properties  file. The value is base64 encoded, like the cluster UUID.

The meta.properties  version field will stay set to 1, to allow for a downgrade after an upgrade on a non JBOD KRaft cluster.

Footnote

If an existing, non JBOD KRaft cluster is upgraded to the first version that includes the changes described in this KIP, which write these new fields, and is later downgraded, the meta.properties file needs to still be readable. There's currently a hard check on the version number which would fail for a new version number. 

The UUIDs for each log directory are automatically generated by the tool if there isn't one assigned already in an existing meta.properties  file.

Having a persisted UUID at the root of each log directory allows the broker to identify the log directory regardless of the mount path.

Example

Given the following server.properties:

...

The command ./bin/kafka-storage.sh format -c /tmp/server.properties --cluster-id 41QSStLtR3qOekbX4ZlbHA  would generate three meta.properties  files that could look like the following:

/var/lib/kafka/metadata/meta.properties :

#
#Thu Aug 18 15:23:07 BST 2022
node.id=8
version=1
cluster.id=41QSStLtR3qOekbX4ZlbHA
directory.id=e6umYSUsQyq7jUUzL9iXMQ
/mnt/d1/meta.properties :
#
#Thu Aug 18 15:23:07 BST 2022
node.id=8
version=1
cluster.id=41QSStLtR3qOekbX4ZlbHA
directory.id=b4d9ExdORgaQq38CyHwWTA
/mnt/d2/meta.properties :
#
#Thu Aug 18 15:23:07 BST 2022
node.id=8
version=1
cluster.id=41QSStLtR3qOekbX4ZlbHA
directory.id=P2aL9r4sSqqyt7bC0uierg

Each directory, including the directory that holds the cluster metadata topic — metadata.log.dir  — has a different and respective value as the directory ID.

In the example above, we can identify the following directory mapping:

  • /var/lib/kafka/metadata  has log directory UUID e6umYSUsQyq7jUUzL9iXMQ 
  • /mnt/d1  has log directory UUID b4d9ExdORgaQq38CyHwWTA 
  • /mnt/d2 has log directory UUID P2aL9r4sSqqyt7bC0uierg 

Brokers

Broker lifecycle management

When the broker starts up and initializes LogManager, it will load the UUID for each log directory (directory.id ) by reading the meta.properties file at the root of each of them.

  • If there are any two log directories with the same UUID, the Broker will fail at startup
  • If there are any meta.properties files missing directory.id, a new UUID is generated, and assigned to that directory by updating the meta.properties file.

The set of all loaded log directory UUIDs is sent along in the broker registration request to the controller as the OnlineLogDirs field. If any configured log directories is unavailable, OfflineLogDirs is set to true.

Metadata caching

Currently, Replicas are considered offline if the hosting broker is offline. Additionally, replicas will also be considered offline if the replica references a log directory UUID (in the new field partitionRecord.Assignment.Directory) that is not present in the hosting Broker's latest registration under OnlineLogDirs and either:

  • the hosting broker's latest registration indicates multiple online log directories. i.e. brokerRegistration.OnlineLogDirs.length > 1
  • the hosting broker's latest registration indicates that there are offline directories. i.e. brokerRegistration.OfflineLogDirs == true

If neither of the above conditions are true, we assume that there is only one log directory configured, the broker is not configured with multiple log directories, replicas all live in the same directory and neither log directory assignments nor log directory failures shall be communicated to the Controller. 

Handling log directory failures

When multiple log directories are configured, and some (but not all) of them become offline, the broker will communicate this change using the new field OfflineLogDirs  in the BrokerHeartbeat  request — indicating the UUIDs of the new offline log directories. The UUIDs for the newly failed log directories are included in the BrokerHeartbeat  request until the broker receives a successful response. If the Broker is configured with a single log directory, this field isn't used, as the current behavior of the broker is to shutdown when no log directories are online.

If there are any queued partition-to-directory assignments to send to the controller, those that are respective to any of the newly failed log directories (i.e. assignments that are either into or out-of these directories) are prioritized and sent first. The broker retries these until it receives a successful reply, which conveys that the metadata change has been successfully persisted. This ensures that the Controller is in sync with regards to partition-to-directory assignments and can reliably determine which partitions need leadership and ISR update.

If the Broker repeatedly cannot communicate fails to communicate a log directory failure after a configurable amount of time — log.dir.failure.timeout.ms — and it is the leader for any replicas in the failed log directory the broker will shutdown, as that is the only other way to guarantee that the controller will elect a new leader for those partitions.

Replica management

When configured with multiple log.dirs, as the broker catches up with metadata, and sees the partitions which it should be hosting, it will check the associated log directory UUID for each partition (partitionRecord.Assignment.Directory).

  • If the partition is not assigned to a log directory (refers to Uuid.UnknownDir)
    • If the partition already exists, the broker uses the new RPC — AssignReplicasToDirs — to notify the controller to change the metadata assignment to the actual log directory.
    • If the partition does not exist, the broker selects a log directory and uses the new RPC — AssignReplicasToDirs — to notify the controller to create the metadata assignment to the actual log directory.
  • If the partition is assigned to an online log directory
    • If the partition does not exist it is created in the indicated log directory.
    • If the partition already exists in the indicated log directory and no future replica exists, then no action is taken.
    • If the partition already exists in the indicated log directory, and there is a future replica in another log directory, then the broker starts the process to replicate the current replica to the future replica.
    • If the partition already exists in another online log directory and is a future replica in the log directory indicated by the metadata, the broker will replace the current replica with the future replica after making sure that the future replica is fully caught up with the current replica.
    • If the partition already exists in another online log directory, the broker uses the new RPC — AssignReplicasToDirs — to the controller to change the metadata assignment to the actual log directory. The partition might have been moved to a different log directory whilst the broker was offline. 
  • If the partition is assigned to an unknown log directory or refers to Uuid.OfflineDir
    • If there are offline log directories, no action is taken — the assignment refers to a a log directory which may be offline, we don't want to fill the remaining online log directories with replicas that existed in the offline ones.
    • If there are no offline directories, the broker selects a log directory and uses the new RPC — AssignReplicasToDirs — to notify the controller to create the metadata assignment to the actual log directory.

If instead, a single entry is configured under log.dirs or log.dir, then the Assignment.Directory field is always ignored, and AssignReplicasToDirs is never sent.

If the broker is configured with multiple log directories it remains FENCED until it can verify that all partitions are assigned to the correct log directories in the cluster metadata. This excludes the log directory that hosts the cluster metadata topic, if it is configured separately to a different path — using metadata.log.dir.

Intra-broker replica movement

Support for replica movement between directories was introduced in KIP-113. This functionality is maintained, but altered slightly so that the controller remains correctly informed of the log directory for any moving replica.

The existing AlterReplicaLogDirs RPC is sent directly to the broker in question, which starts moving the replicas using a AlterReplicaLogDirs threads – this remains unchanged. But when the future replica first catches up with the main replica, instead of immediately promoting the future replica, the broker will:

  1. Asynchronously communicate the log directory change to the controller using the new RPC – AssignReplicasToDirs.
  2. Keep the AlterReplicaLogDirs thread going. The future replica is still the future replica, and it continues to copy from the main replica – which still in the original log directory – as new records are appended.

Once the broker receives confirmation of the metadata change – indicated by a successful response to AssignReplicasToDirs – then it will:

  1. Block appends to the main (old) replica and waits for the future replica to fully catch up once again.
  2. Makes the switch, promoting the future replica to main replica and cleaning up the old one.

By delaying the metadata change until the future replica has caught up we minimize the chance of a log directory failure happening with an incorrect replica to log directory assignment in the metadata.

The diagram below illustrates the sequence of steps involved in moving a replica between log directories.

Image Removed

In the diagram above, notice that if dir1 fails after the AssignReplicasToDirs RPC is sent, but before the future replica is promoted, then the controller will not know to update leadership and ISR for the partition. If the destination directory has failed, it won't be possible to promote the future replica, and the Broker needs to revert the assignment (cancelled locally if still queued). If the source directory has failed, then the future replica might not catch up, and the Controller might not update leadership and ISR for the partition. In this exceptional case, the broker issues a AssignReplicasToDirs RPC to the Controller to assignment the replica to UUID.OfflineDir - this lets the Controller know that it needs to update leadership and ISR for this partition too.

Controller

Replica placement

For any new partitions, the active controller will use Uuid.UnknownDir as the initial value for log directory UUID for each replica – this is the default (empty) value for the tagged field. Each broker with multiple log.dirs hosting replicas then assigns a log directory UUID and communicates it back to the active controller using the new RPC AssignReplicasToDirs so that cluster metadata can be updated with the log directory assignment. Brokers that are configured with a single log directory to not send this RPC.

Handling log directory failures

When a controller receives a BrokerHeartbeat request from a broker that indicates any UUIDs under the new OfflineLogDirs field, it will:

  • Persist a BrokerRegistrationChange record, with the new list of online log directories and update the offline log directories flag.
  • Update the Leader and ISR for all the replicas assigned to the failed log directories, persisting PartitionChangeRecords, in a similar way to how leadership and ISR is updated when a broker becomes fenced, unregistered or shuts down.

If the any of the listed log directory UUIDs is not a registered log directory then the call fails with error 57 — LOG_DIR_NOT_FOUND.

Handling replica assignments

The controller accepts the AssignReplicasToDirs RPC and persists the assignment into metadata records.

If the indicated log directory UUID is not one of the Broker's online log directories, then the replica is considered offline and the leader and ISR is updated accordingly, same as when the BrokerHeartbeat indicates a new offline log directory.

Broker registration

Upon a broker registration request the controller will persist the broker registration as cluster metadata including the online log directory list and offline log directories flag for that broker. The controller may receive a new list of online directories and offline log directories flag — different from what was previously persisted in the cluster metadata for the requesting broker.

  • If there are no indicated online log directory UUIDs the request is invalid and the controller replies with an error 42 – INVALID_REQUEST.
  • If multiple log directories are registered the broker will remain fenced until the controller learns of all the partition to log directory placements in that broker - i.e. no remaining replicas assigned to Uuid.UnknownDir . The broker will indicate these using the AssignReplicasToDirs RPC.

    • The broker remains fenced by not wanting to unfence itself in heartbeat requests until the number of mismatching replica to log directory assignments is zero. This number is represented by the new metric NumMismatchingReplicaToLogDirAssignments.
  • If multiple log directories are registered and some of them are new (not present in previous registration) then these log directories are assumed to be empty. If they are not, the broker will use the AssignReplicasToDirs RPC to correct assignment and choose not to become UNFENCED before the metadata is correct.
  • In the special case where previous broker registration indicates a single online log directory and no offline log directories, and the inbound broker registration request indicates more than one log directory, and one of the indicated log directories is the same one previously registered, then a logical update to all partitions in that broker takes place, assigning the replica's directory to the single directory previously registered – i.e. it is assumed that all replicas are still in the same directory, and this transition to JBOD avoids creating partition change records. This same logic is considered in every node while consuming and caching metadata changes.

Brokers whose registration indicates that multiple log directories are configured remain FENCED until all log directory assignments for that broker are learnt by the active controller and persisted into metadata.

...

  • As per KIP-866, a separate Controller quorum is setup first, and only then the existing brokers are reconfigured and upgraded.
  • When configured for the migration and while still in ZK mode, brokers will:
    • update meta.properties to generate and include directory.id;
    • send BrokerRegistrationRequest including the log directory UUIDs;
    • shutdown if any directory fails;
    • sends assignments via the  AssignReplicasToDirs RPCnotify the controller of log directory failures via BrokerHeartbeatRequest.
  • During the migration, the controller:
    • persists log directories indicated in broker registration requests in the cluster metadata;
    • relies on heartbeat requests to detect log directory failure instead of monitoring the ZK znode for notifications;
    • still uses full LeaderAndIsr requests to process log directory failures for any brokers still running in ZK modepersists directory assignments received via the AssignReplicasToDirs RPC.
  • The brokers restarting into KRaft mode will want to stay fenced until their log directory assignments for all hosted partitions are persisted in the cluster metadata.
  • The active controller will also ensure that any given broker stays fenced until it learns of all partition to log directory assignments in that specific broker via the new AssignReplicasToDirs RPC.
  • During the migration, existing replicas are assumed and assigned to log directory Uuid.UnknownDir until MIGRATING_DIR until the actual log directory is learnt by the active controller from a broker running in KRaft mode.

...