...
A new property — directory.id
— will be expected in the meta.properties
file in each log directory configured under log.dirs
. The property indicates the UUID for the log directory where the file is located. If any of the meta.properties
files does not contain directory.id
one will be randomly generated and the file will be updated upon Broker startup. The kafka-storage.sh
tool will be extended to generate this property as described in the previous section.
If log directory that holds the cluster metadata topic is configured separately to a different path — using metadata.log.dir
— then this log directory does not get a UUID assigned.
Footnote |
---|
The broker cannot run if this particular log directory is unavailable, and when configured separately it cannot host any user partitions, so there's no point in identifying it in the Controller. |
Reserved UUIDs
The following UUIDs are excluded from the random pool when generating a log directory UUID:
...
PartitionRecord
and PartitionChangeRecord
will both have a new Assignment
field which replaces the current Replicas
field: Directories
field
{ "name": "ReplicasDirectories", "type": "[]int32uuid", "versions": "0", "entityType": "brokerId""1+",
"about": "The replicaslog directory ofhosting thiseach partitionreplica, sorted by preferred order." },
(...)
{ "name": "Assignment", "type": "[]ReplicaAssignment", "versions": "1+",
"about": "The replicas of this partition, sorted by preferred order.", "fields": [
{ "name": "Broker", "type": "int32", "versions": "1+", "entityType": "brokerId",
"about": "The broker ID hosting the replica." },
{ "name": "Directory", "type": "uuid", "versions": "1+",
"about": "The log directory hosting the replica" }
]}in the same exact order as the Replicas field."}
Although not explicitly specified in the schema, the default value for Directory
is Uuid.UNASSIGNED_DIR (Uuid.ZERO)
, as that's the default default Although not explicitly specified in the schema, the default value for Directory
is Uuid.UNASSIGNED_DIR (Uuid.ZERO)
, as that's the default default value for UUID types.
Footnote |
---|
Yes, double default, not a typo. The default setting, for the default value of the field. |
...
{
"apiKey": <TBD>,
"type": "request",
"listeners": ["controller],
"name": "AssignReplicasToDirsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the requesting broker" },
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
"about": "The epoch of the requesting broker" },
{ "name": "Directories", "type": "[]DirectoryData", "versions": "0+", "fields": [
{ "name": "Id", "type": "uuid", "versions": "0+", "about": "The ID of the directory" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
{ "name": "NameTopicName", "type": "stringuuid", "versions": "0+", "entityType": "topicName",
"about": "The name of the assigned topic" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" }
]}
]}
]}
]
}
{
"apiKey": <TBD>,
"type": "response",
"name": "AssignReplicasToDirsResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level response error code" },
{ "name": "Directories", "type": "[]DirectoryData", "versions": "0+", "fields": [
{ "name": "Id", "type": "uuid", "versions": "0+", "about": "The ID of the directory" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
{ "name": "NameTopicId", "type": "stringuuid", "versions": "0+", "entityType": "topicName",
"about": "The name of the assigned topic" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The partition level error code" }
]}
]}
]}
]
}
A AssignReplicasToDirs
request including an assignment to Uuid.LOST_DIR
conveys that the Broker is wanting to correct a replica assignment into a offline log directory, which cannot be identified.
This request is authorized with CLUSTER_ACTION on CLUSTER.
Proposed changes
Metrics
MBean name | Description |
---|---|
kafka.server:type=KafkaServer,name=QueuedReplicaToDirAssignments | The number of replicas hosted by the broker that are either missing a log directory assignment in the cluster metadata or are currently found in a different log directory and are queued to be sent to the controller in a |
...
In the example above, we can identify the following directory mapping:
/var/lib/kafka/metadata
has log directory UUIDe6umYSUsQyq7jUUzL9iXMQ
/metadata
has log directory UUID-
e6umYSUsQyq7jUUzL9iXMQ
/mnt/
d2 has log directory UUIDP2aL9r4sSqqyt7bC0uierg
/mnt/d1
b4d9ExdORgaQq38CyHwWTA
Brokers
Broker lifecycle management
When the broker starts up and initializes LogManager
, it will load the UUID for each log directory (directory.id
) by reading the meta.properties
file at the root of each of them.
- If there are any two log directories with the same UUID, the Broker will fail at startup
- If there are any
meta.properties
files missingdirectory.id
, a new UUID is generated, and assigned to that directory by updating themeta.properties
file.
The set of all loaded log directory UUIDs is sent along in the broker registration request to the controller as the LogDirs
field.
Metadata caching
Currently, Replicas are considered offline if the hosting broker is offline. Additionally, replicas will also be considered offline if the replica references a log directory UUID (in the new field partitionRecord.Assignment.Directory
) that is not present in the hosting Broker's latest registration under LogDirs
and either:
- the log directory UUID is
UUID.LOST_DIR
- the hosting broker's registration indicates multiple online log directories. i.e.
brokerRegistration.LogDirs.length > 1
If neither of the above conditions are true, we assume that there is only one log directory configured, the broker is not configured with multiple log directories, replicas all live in the same directory and neither log directory assignments nor log directory failures shall be communicated to the Controller.
Handling log directory failures
When multiple log directories are configured, and some (but not all) of them become offline, the broker will communicate this change using the new field OfflineLogDirs
in the BrokerHeartbeat
request — indicating the UUIDs of the new offline log directories. The UUIDs for the newly failed log directories are included in the BrokerHeartbeat
request until the broker receives a successful response. If the Broker is configured with a single log directory, this field isn't used, as the current behavior of the broker is to shutdown when no log directories are online.
Log directory failure notifications are queued and batched together in the next broker heartbeat request. If there are any queued partition-to-directory assignments — sent in AssignReplicasToDirs
— to send to the controller, those that are respective to any of the newly failed log directories (i.e. assignments that are either into or out-of these directories) are prioritized and sent first. The broker retries these until it receives a successful reply, which conveys that the metadata change has been successfully persisted. This ensures that the Controller is in sync with regards to partition-to-directory assignments and can reliably determine which partitions need leadership and ISR update.
d1
has log directory UUIDb4d9ExdORgaQq38CyHwWTA
/mnt/d2
has log directory UUIDP2aL9r4sSqqyt7bC0uierg
Brokers
Broker lifecycle management
When the broker starts up and initializes LogManager
, it will load the UUID for each log directory (directory.id
) by reading the meta.properties
file at the root of each of them.
- If there are any two log directories with the same UUID, the Broker will fail at startup
- If there are any
meta.properties
files missingdirectory.id
, a new UUID is generated, and assigned to that directory by updating themeta.properties
file.
The set of all loaded log directory UUIDs is sent along in the broker registration request to the controller as the LogDirs
field.
Metadata caching
Currently, Replicas are considered offline if the hosting broker is offline. Additionally, replicas will also be considered offline if the replica references a log directory UUID (in the new field partitionRecord.Directories
) that is not present in the hosting Broker's latest registration under LogDirs
and either:
- the log directory UUID is
UUID.LOST_DIR
- the hosting broker's registration indicates multiple online log directories. i.e.
brokerRegistration.LogDirs.length > 1
If neither of the above conditions are true, we assume that there is only one log directory configured, the broker is not configured with multiple log directories, replicas all live in the same directory and neither log directory assignments nor log directory failures shall be communicated to the Controller.
Handling log directory failures
When multiple log directories are configured, and some (but not all) of them become offline, the broker will communicate this change using the new field OfflineLogDirs
in the BrokerHeartbeat
request — indicating the UUIDs of the new offline log directories. The UUIDs for the accumulated failed log directories are included in every BrokerHeartbeat
request until the broker restarts. If the Broker is configured with a single log directory, this field isn't used, as the current behavior of the broker is to shutdown when no log directories are online.
Log directory failure notifications are queued and batched together in all future broker heartbeat requests.
If the Broker repeatedly fails to communicate a log directory failure, or a replica assignment into a failed directory, If the Broker repeatedly cannot communicate fails to communicate a log directory failure after a configurable amount of time — log.dir.failure.timeout.ms
— and it is the leader for any replicas in the failed log directory the broker will shutdown, as that is the only other way to guarantee that the controller will elect a new leader for those partitions.
...
When configured with multiple log.dirs
, as the broker catches up with metadata, and sees the partitions which it should be hosting, it will check the associated log directory UUID for each partition (partitionRecord.Assignment.DirectoryDirectories
).
- If the partition is not assigned to a log directory (refers to
Uuid.UNASSIGNED_DIR)
- If the partition already exists, the broker uses the new RPC —
AssignReplicasToDirs
— to notify the controller to change the metadata assignment to the actual log directory. - If the partition does not exist, the broker selects a log directory and uses the new RPC —
AssignReplicasToDirs
— to notify the controller to create the metadata assignment to the actual log directory.
- If the partition already exists, the broker uses the new RPC —
- If the partition is assigned to an online log directory
- If the partition does not exist it is created in the indicated log directory.
- If the partition already exists in the indicated log directory and no future replica exists, then no action is taken.
- If the partition already exists in the indicated log directory, and there is a future replica in another log directory, then the broker starts the process to replicate the current replica to the future replica.
- If the partition already exists in another online log directory and is a future replica in the log directory indicated by the metadata, the broker will replace the current replica with the future replica after making sure that the future replica is fully caught up with the current replica.
- If the partition already exists in another online log directory, the broker uses the new RPC —
AssignReplicasToDirs
— to the controller to change the metadata assignment to the actual log directory. The partition might have been moved to a different log directory whilst the broker was offline.
- If the partition is assigned to an unknown log directory or refers to
Uuid.LOST_DIR
- If there are offline log directories, no action is taken — the assignment refers to a a log directory which may be offline, we don't want to fill the remaining online log directories with replicas that existed in the offline ones.
- If there are no offline directories, the broker selects a log directory and uses the new RPC —
AssignReplicasToDirs
— to notify the controller to create the metadata assignment to the actual log directory.
...
The existing AlterReplicaLogDirs
RPC is sent directly to the broker in question, which starts moving the replicas using a AlterReplicaLogDirs
threads – ReplicaAlterLogDirsThread
– this remains unchanged. But when the future replica first catches up with the main replica, instead of immediately promoting the future replica, the broker will:
- Asynchronously communicate the log directory change to the controller using the new RPC –
AssignReplicasToDirs
. - Keep the
AlterReplicaLogDirs
thread goingReplicaAlterLogDirsThread
going. The future replica is still the future replica, and it continues to copy from the main replica – which still in the original log directory – as new records are appended.
...
The diagram below illustrates the sequence of steps involved in moving a replica between log directories.
In the diagram above, notice that if dir1
fails after the AssignReplicasToDirs
RPC is sent, but before the future replica is promoted, then the controller will not know to update leadership and ISR for the partition. If the destination directory has failed, it won't be possible to promote the future replica, and the Broker needs to revert the assignment (cancelled locally if still queued). If the source directory has failed, then the future replica might not catch up, and the Controller might not update leadership and ISR for the partition. In this exceptional case, the broker issues a AssignReplicasToDirs
RPC to the Controller to assignment the replica to UUID.LOST_DIR
- this lets the Controller know that it needs to update leadership and ISR for this partition too.
...
- As per KIP-866, a separate Controller quorum is setup first, and only then the existing brokers are reconfigured and upgraded.
- When configured for the migration and while still in ZK mode, brokers will:
- update meta.properties to generate and include
directory.id
;
- send
BrokerRegistrationRequest
including the log directory UUIDs; - shutdown if any directory fails;
- sends assignments via the
AssignReplicasToDirs
RPCnotify the controller of log directory failures via BrokerHeartbeatRequest.
- update meta.properties to generate and include
- During the migration, the controller:
- persists log directories indicated in broker registration requests in the cluster metadata;
- relies on heartbeat requests to detect log directory failure instead of monitoring the ZK znode for notifications;
- still uses full
LeaderAndIsr
requests to process log directory failures for any brokers still running in ZK modepersists directory assignments received via theAssignReplicasToDirs
RPC.
- The brokers restarting into KRaft mode will want to stay fenced until their log directory assignments for all hosted partitions are persisted in the cluster metadata.
- The active controller will also ensure that any given broker stays fenced until it learns of all partition to log directory assignments in that specific broker via the new
AssignReplicasToDirs
RPC. - During the migration, existing replicas are assumed and assigned to log directory
Uuid.MIGRATING_DIR
until the actual log directory is learnt by the active controller from a broker running in KRaft mode.
...