Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Align naming with existing RPC

...

RegisterBrokerRecord and BrokerRegistrationChangeRecord will both have two new fields:

{ "name": "OnlineLogDirectoriesOnlineLogDirs", "type":  "[]uuid", "versions":  "2+",
"about": "Log directories configured in this broker which are available." },
{ "name": "OfflineLogDirectoriesOfflineLogDirs", "type": "[]uuid", "versions": "2+",
"about": "Log directories configured in this broker which are not available." }

...

BrokerRegistrationRequest will include the following two new fields:

{ "name": "OnlineLogDirectoriesOnlineLogDirs", "type":  "[]uuid", "versions":  "1+",
"about": "Log directories configured in this broker which are available." },
{ "name": "OfflineLogDirectoriesOfflineLogDirs", "type": "[]uuid", "versions": "1+",
"about": "Log directories configured in this broker which are not available." }

...

{ "name": "LogDirsOfflined", "type":  "[]uuid", "versions":  "1+",
"about": "Log directories that failed and went offline." }

A new RPC named AssignReplicasToDirectories AssignReplicasToDirs will be introduced with the following request and response:

{
"apiKey": <TBD>,
"type": "request",
"listeners": ["controller],
"name": "AssignReplicasToDirectoriesRequestAssignReplicasToDirsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the requesting broker" },
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
"about": "The epoch of the requesting broker" },
{ "name": "Directories", "type": "[]DirectoryData", "versions": "0+", "fields": [
{ "name": "Id", "type": "uuid", "versions": "0+", "about": "The ID of the directory" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The name of the assigned topic" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" }
]}
]}
]}
]
}
{
"apiKey": <TBD>,
"type": "response",
"name": "AssignReplicasToDirectoriesResponseAssignReplicasToDirsResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level response error code" },
{ "name": "Directories", "type": "[]DirectoryData", "versions": "0+", "fields": [
{ "name": "Id", "type": "uuid", "versions": "0+", "about": "The ID of the directory" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The name of the assigned topic" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The partition level error code" }
]}
]}
]}
]
}

Proposed changes

Metrics

MBean nameDescription
kafka.server:type=KafkaServer,name=NumMismatchingReplicaToLogDirAssignmentsThe number of replicas hosted by the broker that are either missing a log directory assignment in the cluster metadata or are currently found in a different log directory. 

Storage format command

The format subcommand will be updated to ensure each log directory has an assigned UUID and it will persist two new properties in the meta.properties  file:

...

  • If the partition is not assigned to a log directory (refers to Uuid.ZERO)
    • If the partition already exists, the broker uses the new RPC — AssignReplicasToDirectories AssignReplicasToDirs — to notify the controller to change the metadata assignment to the actual log directory.
    • If the partition is new, the broker selects a log directory and uses the new RPC — AssignReplicasToDirectories AssignReplicasToDirs — to notify the controller to create the metadata assignment to the actual log directory.
  • If the partition is assigned to an online log directory
    • If the partition is new, or if there are no offline log directories, it is created in the indicated log directory.
    • If the partition already exists in the indicated log directory no action is taken.
    • If the partition already exists in another log directory and is a future replica in the log directory indicated by the metadata, the broker will replace the current replica with the future replica.
    • If the partition already exists in another log directory, the broker uses the new RPC — AssignReplicasToDirectories AssignReplicasToDirs — to the controller to change the metadata assignment to the actual log directory. The partition might have been moved to a different log directory. whilst the broker was offline. 
    • If the partition is not new and does not exist, and there are any offline log directories, the broker uses AssignReplicasToDirectories AssignReplicasToDirs to change the metadata assignment to an offline log directory. This is in line with current behavior, if any log directories are offline non-new replicas are not created. The assignment to an offline log directory signals to the controller that the replica is actually offline, and this prevents a broker without data – due to a failed disk – continuing as the leader in case of any synchronisation failure of the replica to log directory assignment between the broker and the metadata.
  • If the partition is assigned to an offline log directory, no action is taken — the controller is already aware of this.
  • If the partition is assigned to an unknown log directory, no action is taken — the controller is already aware of this and will reassign the replica to one of the online log directories in a future metadata update. 

...

When replicas are moved between directories, using the existing AlterReplicaLogDirs RPC, the receiving broker will start moving the replicas using AlterReplicaLogDirs threads as usual. When a future replica first catches out the broker will asynchronously communicate the log directory change to the controller using the new RPC – AssignReplicasToDirectories AssignReplicasToDirs  – but keep the AlterReplicaLogDirs thread going. Once the broker receives confirmation of the metadata change then it briefly blocks appends to the old replica, makes sure the future log fully caches up and makes the switch. By delaying the metadata change until the future replica has caught up we minimize the chance of a log directory failure happening with an incorrect replica to log directory assignment in the metadata.

...

When one or more log directories becomes become offline, the broker will communicate this change using the new field LogDirsOfflined  in the BrokerHeartbeat  request — indicating the UUIDs of the new offline log directories. The UUIDs for the newly failed log directories are included in the BrokerHeartbeat  request until the broker receives a successful response.

...

For any new partitions, the active controller will use Uuid.ZERO as the initial value for log directory UUID for each replica. Each broker hosting replicas then assigns a log directory UUID and communicates it back to the active controller using the new RPC AssignReplicasToDirectoriesAssignReplicasToDirs so that cluster metadata can be updated with the log directory assignment.

...

The controller accepts the AssignReplicasToDirectories AssignReplicasToDirs RPC and persists the assignment into metadata records. If the indicated log directory UUID is not a registered log directory then the call fails with error 57 — LOG_DIR_NOT_FOUND . If the indicated log directory UUID is listed as offline, then the replica is considered offline and the leader and ISR is updated accordingly, same as when the BrokerHeartbeat indicates a new offline log directory.

...

  • If there are any missing log directories this means those have been removed from the broker’s configuration, so the controller will reassign all replicas currently assigned to the missing log directories to Uuid.ZERO to delegate the choice of log directory the broker, which will then report the choice via the AssignReplicasToDirectories AssignReplicasToDirs RPC.
  • If multiple log directories are registered the broker will remain fenced until the controller learns of all the partition to log directory placements in that broker - i.e. no remaining replicas assigned to Uuid.ZERO . The broker will indicate these using the AssignReplicasToDirectories AssignReplicasToDirs RPC.

    • The broker remains fenced by not wanting to unfence itself in heartbeat requests until the number of mismatching replica to log directory assignments is zero. This number is represented by the new metric NumMismatchingReplicaToLogDirAssignments.
  • If multiple log directories are registered and some of them are new (not present in previous registration) then these log directories are assumed to be empty. If they are not, the broker will use the AssignReplicasToDirectories AssignReplicasToDirs  RPC to correct assignment and choose not to become UNFENCED before the metadata is correct.

...

Upon being reconfigured with multiple log directories, brokers will update and generate meta.properties as necessary to reflect the new log directories. Brokers will then register the log directories with the controller via BrokerRegistration and use AssignReplicasToDirectories AssignReplicasToDirs to create the partition-logdirectory assignments in the cluster metadata before becoming UNFENCED.

...

  • As per KIP-866, a separate Controller quorum is setup first, and only then the existing brokers are reconfigured and upgraded.
  • During the migration, the active controller is still monitoring ZooKeeper for log directory failure notifications and still using full LeaderAndIsr requests to process log directory failures for any brokers still running in ZK mode
  • The brokers restarting into KRaft mode for the first time update meta.properties to generate and include directory.id  and directory.ids .
  • During the migration, the active controller accepts broker registration requests from brokers restarting into KRaft mode, indicating multiple online log directories, but keeps brokers fenced until it learns of all partition to log directory assignments in that broker via the new AssignReplicasToDirectories AssignReplicasToDirs RPC.
  • During the migration, replicas are assumed and assigned to log directory Uuid.ZERO until the actual log directory is learnt by the active controller from a broker running in KRaft mode.

...

  • Partition reassignment across directories and across brokers involves different API calls — AlterPartitionReassignments and AlterReplicaLogDirs. Whilst reassigning partitions across brokers into a specific log directory is already possible, it involves an intricate sequence of calls previous calls to AlterReplicaLogDirs and expecting errors as a successful result. Once this work is done we can consolidate these two API calls by extending AlterPartitionReassignments to allow target log directories to be specified and deprecate AlterReplicaLogDirs. This can be done as part of a future KIP.
  • The only way to know which log directory UUID corresponds to which log directory path is by reading the meta.properties  files in each broker. A future KIP should expand the DescribeLogDirs RPC response to include log directory UUIDs along with the system path for each log directory.
  • Partition initialization can be optimized, by having the controller preselect a log directory for new partitions. This would avoid having to wait for the broker to send a AssignReplicasToDirectories AssignReplicasToDirs request to indicate the chosen log directory before it is safe for the broker to assume leadership of the partition. Maybe the controller could also take available storage in each log directory into account if the the broker indicates the available storage space for each log directory as part of broker registration. This may be be proposed in a future KIP.

...