You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

Status

Current state: Under Discussion

Discussion thread: https://lists.apache.org/thread/8dqvfhzcyy87zyy12837pxx9lgsdhvft 

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Support for multiple log directories per broker, aka JBOD (Just a Bunch Of Disks) came in KIP-112 and since then JBOD has been an important feature in Kafka, allowing it to run on large deployments with multiple local disks.


To ensure availability, when a partition leader fails, the controller should elect a new leader from one of the other in-sync replicas. But the controller does not check whether each leader is correctly performing its duties, instead the controller simply assumes that each broker is working correctly if it is still an active member of the cluster. In KRaft, cluster membership is based on timely heartbeat requests sent by each broker to the active controller. In ZooKeeper, cluster membership is based on an ephemeral zNode under /brokers/ids.

In KRaft mode, when a single log directory fails, the broker will be unable to be either a leader or a follower for any partitions in that log directory, but the controller will have no signal that it needs to update leadership and ISR for the replicas in that log directory, as the broker will continue to send heartbeat request.

In ZooKeeper mode when a log directory fails, the broker sends a notification to the controller which then sends a full LeaderAndIsr request to the broker, listing all the partitions for all log directories for that broker. The controller relies on per-partition error results from the broker to update leadership and ISR for the replicas in the failed log directory. Without this notification, the partitions with leadership on that log directory will not get a new leader assigned and would remain unavailable.

Support for KRaft in JBOD, was proposed and accepted back in KIP-589 — with a new RPC from the broker to the controller indicating the affected topic partitions in a failed log directory — but the implementation was never merged and concerns were raised with possible large requests from the broker to the controller.

KIP-833 was accepted, with plans to mark KRaft as production ready and deprecate ZooKeeper mode, but JBOD is still a missing feature in KRaft. This KIP aims to provide support for JBOD in KRaft, while avoiding any RPC having to list all the partitions in a log directory.

Public interfaces

Command line tools

The kafka-storage.sh  tool will be updated in the following ways:

  • The format  sub-command will produce meta.properties  in a new format described in the following section which includes two new properties directory.id and directory.ids. This sub-command already supports a formatting more than one log directory — by expecting a list of configured log.dirs  —  and "formatting" only the ones that need so.
  • A new sub-command update-directories will be introduced to update the directory.id and directory.ids properties in meta.properties  in all log directories configured in the log.dirs  broker configuration. This command should be used when the configured log directories are already "formatted", e.g. when a log directory is removed from configuration.

If some or all of the log directories are new, then the format  command should be used. Otherwise update-directories  can be used to update the two properties: directory.id  and directory.ids . This can ensure that each log directory configured in log.dirs  has a unique UUID assigned under the property log.directory  in its meta.properties  and that the full set of UUIDs for all configured log.dirs   is persisted in directory.ids in all meta.properties .

meta.properties

The meta.properties version field will be bumped from 1 to 2. Two new properties directory.id and directory.ids will be added to the meta.properties file in each log directory, including the metadata.log.dir . The first property, directory.id indicates the UUID for the log directory where the file is located, the second property, directory.ids  lists all the UUIDs for all the configured log directories. If the meta.properties  file doesn't exist for the metadata.log.dir  the Kafka node will fail to start. If the meta.properties  file exists but it doesn't contain these two properties a new one will be generated and the meta.properties  files will be updated. The kafka-storage.sh  tool will be extended to generate and update the two properties as described in the previous section.

Metadata records

RegisterBrokerRecord will and BrokerRegistrationChangeRecord both have two new fields:

{ "name": "OnlineLogDirectories", "type":  "[]uuid", "versions":  "2+",
"about": "Log directories configured in this broker which are available." },
{ "name": "OfflineLogDirectories", "type": "[]uuid", "versions": "2+",
"about": "Log directories configured in this broker which are not available." }

PartitionRecord and PartitionChangeRecord will both have a new Assignment field which will replace the current Repicas field:

{ "name": "Replicas", "type":  "[]int32", "versions":  "0+", "entityType": "brokerId",
"about": "The replicas of this partition, sorted by preferred order." },
(...)
{ "name": "Assignment", "type": "[]ReplicaAssignment", "versions": "1+",
"about": "The replicas of this partition, sorted by preferred order.", "fields": [
{ "name": "Broker", "type": "int32", "versions": "1+", "entityType": "brokerId",
"about": "The broker ID hosting the replica." },
{ "name": "Directory", "type": "uuid", "versions": "1+",
"about": "The log directory hosting the replica" }
]}

RPC requests

BrokerRegistrationRequest will include the following two new fields:

{ "name": "OnlineLogDirectories", "type":  "[]uuid", "versions":  "1+",
"about": "Log directories configured in this broker which are available." },
{ "name": "OfflineLogDirectories", "type": "[]uuid", "versions": "1+",
"about": "Log directories configured in this broker which are not available." }

BrokerHeartbeatRequest will include the following new field:

{ "name": "LogDirsOfflined", "type":  "[]uuid", "versions":  "1+",
"about": "Log directories that failed and went offline." }

A new RPC named AssignReplicasToDirectories will be introduced with the following request and response:

{
"apiKey": <TBD>,
"type": "request",
"listeners": ["controller],
"name": "AssignReplicasToDirectoriesRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the requesting broker" },
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
"about": "The epoch of the requesting broker" },
{ "name": "Directories", "type": "[]DirectoryData", "versions": "0+", "fields": [
{ "name": "Id", "type": "uuid", "versions": "0+", "about": "The ID of the directory" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The name of the assigned topic" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" }
]}
]}
]}
]
}
{
"apiKey": <TBD>,
"type": "response",
"name": "AssignReplicasToDirectoriesResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level response error code" },
{ "name": "Directories", "type": "[]DirectoryData", "versions": "0+", "fields": [
{ "name": "Id", "type": "uuid", "versions": "0+", "about": "The ID of the directory" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The name of the assigned topic" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The partition level error code" }
]}
]}
]}
]
}

Proposed changes

Storage format command

The storage format tool must be run when new log directories are added or removed from a broker’s configuration. The format subcommand will be updated to ensure each log directory has an assigned UUID and it will persist two new properties in the meta.properties  file:

  • A property named directory.id indicating the UUID for the log directory where the meta.properties file is located. The value is base64 encoded, like the cluster UUID.
  • A property named directory.ids indicating the complete list of all UUIDs for each configured log directory. Values are base64 encoded and comma-separated. The order does not matter.

The meta.properties  version field will be bumped from 1 to 2.

A new sub-command update-directories  will be introduced to update the two properties for existing and already "formatted" log directories.

The UUIDs for each log directory are automatically generated by the tool if there isn't one assigned already in an existing meta.properties  file.

Having a persisted UUID at the root of each log directory allows the broker to identify the log directory regardless of the mount path.
Having a persisted list of all UUIDs for all configured log directories allows the broker to determine the UUIDs of unavailable (offline) log directories, as the meta.properties files for the offline log directories are likely to be unavailable.

Example

Given the following server.properties:

(... other non interesting properties omitted ...)
process.roles=broker
node.id=8
metadata.log.dir=/var/lib/kafka/metadata
log.dirs=/mnt/d1,/mnt/d2

The command ./bin/kafka-storage.sh format -c /tmp/server.properties --cluster-id 41QSStLtR3qOekbX4ZlbHA  would generate three meta.properties  files that could look like the following:

/var/lib/kafka/metadata/meta.properties :

#
#Thu Aug 18 15:23:07 BST 2022
node.id=8
version=2
cluster.id=41QSStLtR3qOekbX4ZlbHA
directory.id=e6umYSUsQyq7jUUzL9iXMQ
directory.ids=e6umYSUsQyq7jUUzL9iXMQ,b4d9ExdORgaQq38CyHwWTA,P2aL9r4sSqqyt7bC0uierg
/mnt/d1/meta.properties :
#
#Thu Aug 18 15:23:07 BST 2022
node.id=8
version=2
cluster.id=41QSStLtR3qOekbX4ZlbHA
directory.id=b4d9ExdORgaQq38CyHwWTA
directory.ids=e6umYSUsQyq7jUUzL9iXMQ,b4d9ExdORgaQq38CyHwWTA,P2aL9r4sSqqyt7bC0uierg
/mnt/d1/meta.properties :
#
#Thu Aug 18 15:23:07 BST 2022
node.id=8
version=2
cluster.id=41QSStLtR3qOekbX4ZlbHA
directory.id=P2aL9r4sSqqyt7bC0uierg
directory.ids=e6umYSUsQyq7jUUzL9iXMQ,b4d9ExdORgaQq38CyHwWTA,P2aL9r4sSqqyt7bC0uierg

Each directory, including the directory that holds the cluster metadata topic — metadata.log.dir  — has a different and respective value as the directory ID. The full set of directory IDs — for all log dirs in log.dirs  but also metadata.log.dir — is persisted in all three metadata files.

In the example above, we can identify the following directory mapping:

  • /var/lib/kafka/metadata  has log directory UUID e6umYSUsQyq7jUUzL9iXMQ 
  • /mnt/d1  has log directory UUID b4d9ExdORgaQq38CyHwWTA 
  • /mnt/d2 has log directory UUID P2aL9r4sSqqyt7bC0uierg 

If some but not all log directories are unavailable, the broker is able to identify which UUIDs refer to offline log directories. 

If there are no new log directories ./bin/kafka-storage.sh update-directories -c /tmp/server.properties would update the three meta.properties  files as necessary to ensure the two properties directory.id  and directory.ids  are present and in sync.

Brokers

Broker lifecycle management

When the broker starts up and initializes the LogManager, it will load the UUID for each log directory (directory.id ) and the list of all log directory UUIDs (directory.ids), by reading the meta.properties file at the root of each log directory.

The broker will fail at startup if either:

a) there are any two log directories with the same UUID, i.e. if different meta.properties  files indicate the same directory.id ; or

b) the full set of log directory UUIDs in directory.ids  does not match in all loaded meta.properties files; or

c) the number of log directory UUIDs in directory.ids  does not match the number of system paths in the broker configuration log.dirs.

The broker will then diff all the loaded UUIDs in directory.id  with the full set of all UUIDs (in directory.ids) to obtain the set of UUIDs for offline log directories. The sets of both online and offline log directory UUIDs are sent along in the broker registration request to the controller.

Replica management

As the broker catches up with metadata, and sees the partitions which it should be hosting, it will check the associated log directory UUID for each partition.

  • If the partition is not assigned to a log directory (refers to Uuid.ZERO)
    • If the partition already exists, the broker uses the new RPC — AssignReplicasToDirectories — to notify the controller to change the metadata assignment to the actual log directory.
    • If the partition is new and the broker selects a log directory and uses the new RPC — AssignReplicasToDirectories — to notify the controller to create the metadata assignment to the actual log directory.
  • If the partition is assigned to an online log directory
    • If the partition is new it is created in the indicated log directory.
    • If the partition already exists in the indicated log directory no action is taken.
    • If the partition already exists in another log directory and is a future replica in the log directory indicated by the metadata, the broker will replace the current replica with the future replica.
    • If the partition already exists in another log directory, the broker uses the new RPC — AssignReplicasToDirectories — to the controller to change the metadata assignment to the actual log directory. The partition might have been moved to a different log directory. whilst the broker was offline. 
  • If the partition is assigned to an offline log directory, no action is taken — the controller is already aware of this.
  • If the partition is assigned to an unknown log directory, no action is taken — the controller is already aware of this and will reassign the replica to one of the online log directories in a future metadata update. 

When replicas are moved between directories, using the existing AlterReplicaLogDirs RPC, when the future replica has caught up with the current replica, the broker will send the AssignReplicasToDirectories RPC to the controller changing the association to the future log directory. When the cluster metadata update is seen by the broker, the broker replaces the current replica with the future replica.

If the broker is configured with multiple log directories it remains FENCED until it can verify that all partitions are assigned to the correct log directories in the cluster metadata.

Metadata caching

Replicas are considered offline if the replica references a log directory which is not in the list of online log directories for the broker ID hosting the replica.

Handling log directory failures

When one or more log directories becomes offline, the broker will communicate this change using the new field LogDirsOfflined  in the BrokerHeartbeat  request — indicating the UUIDs of the new offline log directories. The UUIDs for the newly failed log directories are included in the BrokerHeartbeat  request until the broker receives a successful response.

Controller

Replica placement

For any new partitions, the active controller will use Uuid.ZERO as the initial value for log directory UUID for each replica. Each broker hosting replicas then assigns a log directory UUID and communicates it back to the active controller using the new RPC AssignReplicasToDirectories so that cluster metadata can be updated with the log directory assignment.

Handling log directory failures

When a controller receives a BrokerHeartbeat request from a broker that indicates any UUIDs under the new LogDirsOfflined field, it will:

  • Persist a BrokerRegistrationChange record, with the new list of online and offline log directories
  • Update the Leader and ISR for all the replicas assigned to the failed log directories, persisting PartitionChangeRecords, in similar way to how leadership and ISR is updated when a broker becomes fenced, unregistered or shuts down.

Broker registration

Upon a broker registration request the controller will persist the broker registration as cluster metadata including the online and offline log directories for that broker. The controller may receive a new list of online and offline log directories — different from what was previously persisted in the cluster metadata for the requesting broker.

  • If there are any missing log directories this means those have been removed from the broker’s configuration, so the controller will reassign all replicas currently assigned to the missing log directories to the remaining online log directories in the same broker.
  • If log directories are registered and the previous registration has no log directories the broker will remain fenced until the controller learns of all the partition to log directory placements in that broker.  The broker will indicate these using the AssignReplicasToDirectories RPC.

  • If multiple log directories are registered and some of them are new (not present in previous registration), but the previous registration includes at least one log directory then these log directories are assumed to be empty. If they are not, the broker will use AssignReplicasToDirectories to correct assignment.

In a future release, broker registration without online log directory UUIDs will be disallowed.

Brokers whose registration indicates that multiple log directories are configured remain FENCED until all log directory assignments for that broker are learnt by the active controller and persisted into metadata.

Compatibility, Deprecation, and Migration Plan

Migrating a cluster in KRaft without JBOD

The controllers are upgraded first, and broker registration is allowed without log directories. One the controllers are upgraded, the brokers can be upgraded. Once upgraded the brokers will register log directories with the controller and use AssignReplicasToDirectories to create the partition-logdirectory assignments in the cluster metadata. Then the brokers can individually be reconfigured with additional log directories. As this will be the first registration indicating log directories, registering brokers will remain fenced until the log directory assignments are committed to the cluster metadata. 

Registrations without log directories are still allowed if the previous registration also does not reference any log directory UUID. This allows the controllers to be upgraded into this feature first. In a subsequent version following the release of this feature, we can completely disallow broker registration without log directories.

Migrating a cluster in ZK mode running with JBOD

Migration into KRaft mode is addressed in KIP-866. That migration is extended in the following way:

  • During the migration, the active controller is still monitoring ZooKeeper for log directory failure notifications and still using full LeaderAndIsr requests to process log directory failures for any brokers still running in ZK mode
  • During the migration, the active controller accepts broker registration requests from brokers running in KRaft mode, indicating multiple online log directories, but keeps brokers fenced until it learns of all partition to log directory assignments in that broker via the new AssignReplicasToDirectories RPC.
  • During the migration, replicas are assumed and assigned to log directory Uuid.ZERO until the actual log directory is learnt by the active controller from a broker running in KRaft mode.

Replica management

Existing replicas without a log directory are either:

  • Assumed to live in a broker that isn’t yet running on JBOD, because the cluster was previously running in KRaft mode without this feature, and so live in a single log directory, even if the UUID for that directory isn’t yet known by the controller. It is not possible to trigger a log directory failure from a broker that has a single log directory, as the broker would simply shut down if there are no remaining online log directories. Or
  • Assigned to a log directory as of yet unknown, in a broker that remains FENCED. As the broker remains FENCED it cannot assume leadership for any partition, and so a log directory failure would be handled by the current partition leader.

The two assumptions above eliminate the risk of having a broker which is not shutting down, but is unable to continue its leadership responsibilities due to the partition being persisted in a log directory that is broken or otherwise unavailable and the active controller not being aware of such an issue. 

Storage formatting

The changes to storage formatting simply ensure the existence of two new types of metadata files at the log directory roots. The storage format command will need to run when new log directories are configured, but also when log directories are removed.

Test plan

The system test for log directory failures will be extended to KRaft mode.

Future work

  • Partition reassignment across directories and across brokers involves different API calls — AlterPartitionReassignments and AlterReplicaLogDirs. Whilst reassigning partitions across brokers into a specific log directory is already possible, it involves an intricate sequence of calls previous calls to AlterReplicaLogDirs and expecting errors as a successful result. Once this work is done we can consolidate these two API calls by extending AlterPartitionReassignments to allow target log directories to be specified and deprecate AlterReplicaLogDirs. This can be done as part of a future KIP.
  • The only way to know which log directory UUID corresponds to which log directory path is by reading the meta.properties  files in each broker. A future KIP should expand the DescribeLogDirs RPC response to include log directory UUIDs along with the system path for each log directory.
  • Partition initialization can be optimized, by having the controller preselect a log directory for new partitions. This would avoid having to wait for the broker to send a AssignReplicasToDirectories request to indicate the chosen log directory before it is safe for the broker to assume leadership of the partition. Maybe the controller could also take available storage in each log directory into account if the the broker indicates the available storage space for each log directory as part of broker registration. This may be be proposed in a future KIP.

Rejected alternatives

  • Keeping the scope of the log directory to the broker — while this would mean a much simpler change, as was proposed in KIP-589, if only the broker itself knows which partitions were assigned to a log directory, when a log directory fails the broker will need to send a potentially very large request enumerating all the partitions in the failed disk, so that the controller can update leadership and ISRs accordingly.
  • Having controller determine the log directory, and leave that up to the broker — this would avoid require a RPC from the broker upon selecting a new log directory for new replicas, and reduce the time until it is safe for the broker to take leadership of the replica. However the broker is in a better position to make a choice of log directory than the broker, as it has easier access to e.g. disk usage in each log directory. The controller could also have this information if the broker were to include it the broker registration. But to keep the design simple, this optimization is best left for future work.
  • Changing how log directory failure is handled in ZooKeeper mode — ZooKeeper mode is going away, KIP-833 proposed its deprecation in a near future release.
  • Using the system path to identify each log directory, or storing the identifier somewhere else — When running Kafka with multiple log directories, each log directory is typically assigned to a different system disk or volume. The same storage device can be made accessible under a different mount, and Kafka should be able to identify the contents as the same disk. Because the log directory configuration can change on the broker, the only reliable way to identify the log directory in each broker is to add metadata to the file system under the log directory itself.
  • Fail on broker startup if the cluster metadata indicates replicas should be in a different log directory than the directory where the broker actually finds them — Despite not being an advertised feature, currently replicas can be moved between log directories while the broker is offline. Once the broker comes back up it accepts the new location of the replica. To continue supporting this feature, the broker will need to compare the information in the cluster metadata with the actual replica location during startup and take action on any mismatch.
  • Not keeping a list of all configured log directory identifiers in a metadata file in the file system — If a broker finds some log directories to be offline during startup it will need another way to identify the log directory as the metadata is in the disk itself. For this reason, the list of all log directory IDs must also be persisted in somewhere that the broker can expected to be able to access. Regardless of how many log directories are configured, one of the log directories will be configured to host the cluster metadata topic. This can either be one of the data log directories or a completely separate one — typically in the system disk. Since the broker will not startup if this log directory is not available, it is the perfect location to persist the list of all log directory identifiers.
  • No labels