Status
Current state: Under Discussion
Discussion thread: https://lists.apache.org/thread/8dqvfhzcyy87zyy12837pxx9lgsdhvft
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Support for multiple log directories per broker, aka JBOD (Just a Bunch Of Disks) came in KIP-112 and since then JBOD has been an important feature in Kafka, allowing it to run on large deployments with multiple local disks.
To ensure availability, when a partition leader fails, the controller should elect a new leader from one of the other in-sync replicas. But the controller does not check whether each leader is correctly performing its duties, instead the controller simply assumes that each broker is working correctly if it is still an active member of the cluster. In KRaft, cluster membership is based on timely heartbeat requests sent by each broker to the active controller. In ZooKeeper, cluster membership is based on an ephemeral zNode under /brokers/ids
.
In KRaft mode, when a single log directory fails, the broker will be unable to be either a leader or a follower for any partitions in that log directory, but the controller will have no signal that it needs to update leadership and ISR for the replicas in that log directory, as the broker will continue to send heartbeat request.
In ZooKeeper mode when a log directory fails, the broker sends a notification to the controller which then sends a full LeaderAndIsr request to the broker, listing all the partitions for all log directories for that broker. The controller relies on per-partition error results from the broker to update leadership and ISR for the replicas in the failed log directory. Without this notification, the partitions with leadership on that log directory will not get a new leader assigned and would remain unavailable.
Support for KRaft in JBOD, was proposed and accepted back in KIP-589 — with a new RPC from the broker to the controller indicating the affected topic partitions in a failed log directory — but the implementation was never merged and concerns were raised with possible large requests from the broker to the controller.
KIP-833 was accepted, with plans to mark KRaft as production ready and deprecate ZooKeeper mode, but JBOD is still a missing feature in KRaft. This KIP aims to provide support for JBOD in KRaft, while avoiding any RPC having to list all the partitions in a log directory.
Public Interfaces
Command line tools
Storage will have to be formatted — using the storage format tool — not only when a log directory is added, but also when one is removed from configuration.
meta.properties
The meta.properties version field will be bumped from 1 to 2. Two new properties directory.id
and directory.ids
will be added to the meta.properties
file in each log directory, including the metadata.log.dir
. The first property, directory.id
indicates the UUID for the log directory where the file is located, the second property, directory.ids
lists all the UUIDs for all the configured log directories. If the meta.properties
file doesn't exist for the metadata.log.dir
the Kafka node will fail to start. If the meta.properties
file exist but it doesn't contain these two properties a new one will be generated and the meta.properties
files will be updated. The kafka-storage CLI tool will be extended to generate and write the two properties when the format command is used.
Metadata records
RegisterBrokerRecord
will and BrokerRegistrationChangeRecord
both have two new fields:
{ "name": "OnlineLogDirectories", "type": "[]uuid", "versions": "2+",
"about": "Log directories configured in this broker which are available." },
{ "name": "OfflineLogDirectories", "type": "[]uuid", "versions": "2+",
"about": "Log directories configured in this broker which are not available." }
PartitionRecord
and PartitionChangeRecord
will both have a new Assignment
field which will replace the current Repicas
field:
{ "name": "Replicas", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
"about": "The replicas of this partition, sorted by preferred order." },
(...)
{ "name": "Assignment", "type": "[]ReplicaAssignment", "versions": "1+",
"about": "The replicas of this partition, sorted by preferred order.", "fields": [
{ "name": "Broker", "type": "int32", "versions": "1+", "entityType": "brokerId",
"about": "The broker ID hosting the replica." },
{ "name": "Directory", "type": "uuid", "versions": "1+",
"about": "The log directory hosting the replica" }
]}
RPC requests
BrokerRegistrationRequest
will include the following two new fields:
{ "name": "OnlineLogDirectories", "type": "[]uuid", "versions": "2+",
"about": "Log directories configured in this broker which are available." },
{ "name": "OfflineLogDirectories", "type": "[]uuid", "versions": "2+",
"about": "Log directories configured in this broker which are not available." }
A new RPC named ASSIGN_REPLICAS_TO_DIRECTORIES will be introduced with the following request and response:
{
"apiKey": <TBD>,
"type": "request",
"listeners": ["controller],
"name": "AssignReplicasToDirectoriesRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the requesting broker" },
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
"about": "The epoch of the requesting broker" },
{ "name": "Directories", "type": "[]DirectoryData", "versions": "0+", "fields": [
{ "name": "Id", "type": "uuid", "versions": "0+", "about": "The ID of the directory" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The name of the assigned topic" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" }
]}
]}
]}
]
}
{
"apiKey": <TBD>,
"type": "response",
"name": "AssignReplicasToDirectoriesResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level response error code" },
{ "name": "Directories", "type": "[]DirectoryData", "versions": "0+", "fields": [
{ "name": "Id", "type": "uuid", "versions": "0+", "about": "The ID of the directory" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The name of the assigned topic" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The partition level error code" }
]}
]}
]}
]
}
Another new RPC named LOG_DIRECTORIES_OFFLINE will be introduced with the following request and response:
{
"apiKey": <TBD>,
"type": "request",
"listeners": ["controller"],
"name": "LogDirectoriesOfflineRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the requesting broker" },
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
"about": "The epoch of the requesting broker" },
{ "name": "Directories", "type": "[]DirectoryData", "versions": "0+", "fields": [
{ "name": "Id", "type": "uuid", "versions": "0+", "about": "The ID of the directory" }
]}
]
}
{
"apiKey": <TBD>,
"type": "response",
"name": "LogDirectoriesOfflineResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level response error code" },
{ "name": "Directories", "type": "[]DirectoryData", "versions": "0+", "fields": [
{ "name": "Id", "type": "uuid", "versions": "0+", "about": "The ID of the directory" },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The partition level error code" }
]}
]
}
Proposed Changes
Storage format command
The storage format tool must be run when new log directories are added or removed from a broker’s configuration. This command will ensure each log directory has an assigned UUID and it will persist two new properties in the meta.properties file:
- A property named
directory.id
indicating the UUID for the log directory where the meta.properties file is located. The value is base64 encoded, like the cluster UUID. - A property named
directory.ids
indicating the complete list of all UUIDs for each configured log directory. Values are base64 encoded and comma-separated. The order does not matter.
The meta.properties version field will be bumped from 1 to 2.
The UUIDs for each log directory are automatically generated by the tool if there isn't one assigned already in an existing meta.properties
file.
Having a persisted UUID at the root of each log directory allows the broker to identify the log directory regardless of the mount path.
Having a persisted list of all UUIDs for all configured log directories allows the broker to determine the UUIDs of unavailable (offline) log directories, as the meta.properties files for the offline log directories are likely to be unavailable.
Example
Given the following server.properties
:
(... other non interesting properties omitted ...)
process.roles=broker
node.id=8
metadata.log.dir=/var/lib/kafka/metadata
log.dirs=/mnt/d1,/mnt/d2
The command ./bin/kafka-storage.sh format -c /tmp/server.properties --cluster-id 41QSStLtR3qOekbX4ZlbHA
would generate a meta.properties
file that could look like this:
#
#Thu Aug 18 15:23:07 BST 2022
node.id=8
version=2
cluster.id=41QSStLtR3qOekbX4ZlbHA
directory.id=e6umYSUsQyq7jUUzL9iXMQ
directory.ids=e6umYSUsQyq7jUUzL9iXMQ,b4d9ExdORgaQq38CyHwWTA,P2aL9r4sSqqyt7bC0uierg
Each directory, including the directory that holds the cluster metadata topic — metadata.log.dir
— have a different and respective value as the directory ID.
Brokers
Broker lifecycle management
When the broker starts up and initializes the LogManager, it will load the UUID for each log directory (directory.id
) and the list of all log directory UUIDs (directory.ids
), by reading the meta.properties file at the root of each log directory. If different meta.properties files indicate different directory.ids
the broker will fail at startup. It will diff the UUIDs with the list of all UUIDs to obtain the list of UUIDs for offline log directories. The lists of both online and offline log directory UUIDs are sent along in the broker registration request to the controller.
Replica management
As the broker catches up with metadata, and sees the partitions which it should be hosting, it will check the associated log directory UUID for each partition.
- If the partition is associated with log directory UUID with value
Uuid.ZERO
— - If the broker only has one log directory configured, it will place the replica there
- The broker errors, and fails to place the replica if has more than one log directory
- If the partition doesn’t yet exist, it is created in the designated log directory.
- If any partitions already exist, but the hosting log directories do not match the cluster metadata
- If there is a future replica in the log directory indicated by the metadata, the broker will replace the current replica with the future replica
- Otherwise, the broker uses a new RPC —
ASSIGN_REPLICAS_TO_DIRECTORIES
— to the controller to change the metadata association to the actual log directory. The broker will not create the log for the partition until the log directory indicated in the cluster metadata matches the actual log directory.
When replicas are moved between directories, using the existing ALTER_REPLICA_LOG_DIRS
RPC, when the future replica has caught up with the current replica, the broker will send the ASSIGN_REPLICAS_TO_DIRECTORIES
RPC to the controller changing the association to the future log directory. When the cluster metadata update is seen by the broker, the broker replaces the current replica with the future replica.
Metadata caching
Replicas are considered offline if the replica references a log directory which is not in the list of online log directories for the broker ID hosting the replica.
Handling log directory failures
When one or more log directories becomes offline, the broker will communicate this change using a new RPC — LOG_DIRECTORIES_OFFLINE
— indicating the UUIDs of the new offline log directories.
Controller
Replica placement
For any new partitions, the controller will select not only the broker ID for each replica, but also a log directory in those brokers, from their list of online log directories. For any of the brokers which do not have associated log directories in the metadata, the special value of Uuid.ZERO
is used to indicate no log directory.
This special zero value is only used to enable the transition of cluster into this feature. Once the cluster has been upgraded to support JBOD in KRaft, in a future release, Uuid.ZERO
should not be allowed and all registered brokers should have a list of at least one online log directory UUID.
If the controller selected a log directory for a new replica and that log directory is now offline, the broker will choose a new one and invoke one of the new RPCs — ASSIGN_REPLICAS_TO_DIRECTORIES
— to correct the log directory assignment for the replica in the cluster metadata and delay creating the replica until the next cluster metadata is updated by the controller.
Handling log directory failures
When a controller receives a LOG_DIRECTORIES_OFFLINE
request from a broker, it will:
- Persist a
BrokerRegistrationChange
record, with the new list of online and offline log directories - Update the Leader and ISR for all the replicas assigned to the failed log directories, using the existing replica state machine.
Broker registration
Upon a broker registration request the controller will persist the broker registration as cluster metadata including the online and offline log directories for that broker. The controller may receive a new list of online and offline log directories — different from what was previously persisted in the cluster metadata for the requesting broker.
- If there are any missing log directories this means those have been removed from the broker’s configuration, so the controller will reassign all replicas currently assigned to the missing log directories to the remaining online log directories in the same broker.
- If there are new offline log directories, the controller will handle them as log directory failures, updating all leadership and ISR for partition with replicas on those offline log directories using the existing replica state machine.
- If the previous registration lists no log online directories the registration request is rejected unless it specifies a single online log directory. The controller will then assign all replicas in that broker to the specified log directory.
In a future release, broker registration without online log directories will be disallowed.
Compatibility, Deprecation, and Migration Plan
Migrating a cluster in KRaft without JBOD
The controllers are upgraded first, and broker registration is allowed without log directories. One the controllers are upgraded, the brokers can be upgraded, while still using a single log directory. Once upgraded the brokers will register the single log directory with the controller which will assign all replicas to that log directory. Then the brokers can individually be reconfigured with additional log directories.
When first registering with log directories, a broker can only register a single log directory. This allows the controller to assign all existing replicas in that broker to the single log directory. In a subsequent version following the release of this feature, we can completely disallow broker registration without log directories.
Migrating a cluster in ZK mode running with JBOD
Migration into KRaft mode is yet an unsolved problem in general. In the context of JBOD support, the ASSIGN_REPLICAS_TO_DIRECTORIES
RPC can optionally be used in the migration to create the initial replica to log directory association.
Replica management
Existing replicas without a log directory are assumed to live in a broker that isn’t yet running on JBOD, and so live in a single log directory, even if the UUID for that directory isn’t yet known by the controller. It is not possible to trigger a log directory failure from a broker that has a single log directory, as the broker would simply shut down if there are no remaining online log directories.
Storage formatting
The changes to storage formatting simply ensure the existence of two new types of metadata files at the log directory roots. The storage format command will need to run when new log directories are configured, but also when log directories are removed.
Test Plan
The system test for log directory failures will be extended to KRaft mode.
Rejected Alternatives
- Keeping the scope of the log directory to the broker — while this would mean a much simpler change, as was proposed in KIP-589, if only the broker itself knows which partitions were assigned to a log directory, when a log directory fails the broker will need to send a potentially very large request enumerating all the partitions in the failed disk, so that the controller can update leadership and ISRs accordingly.
- Having controller not determine the log directory, and leave that up to the broker — this would always require an RPC from the broker upon selecting a new log directory for new replicas, and the replica creation would always have to wait until the controller confirmed the cluster metadata update. By having the controller select a log directory, we eliminate this extra latency for most cases.
- Changing how log directory failure is handled in ZooKeeper mode — ZooKeeper mode is going away, KIP-833 proposed its deprecation in a near future release.
- Using the system path to identify each log directory, or storing the identifier somewhere else — When running Kafka with multiple log directories, each log directory is typically assigned to a different system disk or volume. The same storage device can be made accessible under a different mount, and Kafka should be able to identify the contents as the same disk. Because the log directory configuration can change on the broker, the only reliable way to identify the log directory in each broker is to add metadata to the file system under the log directory itself.
- Fail on broker startup if the cluster metadata indicates replicas should be in a different log directory than the directory where the broker actually finds them — Despite not being an advertised feature, currently replicas can be moved between log directories while the broker is offline. Once the broker comes back up it accepts the new location of the replica. To continue supporting this feature, the broker will need to compare the information in the cluster metadata with the actual replica location during startup and take action on any mismatch.
- Not keeping a list of all configured log directory identifiers in a metadata file in the file system — If a broker finds some log directories to be offline during startup it will need another way to identify the log directory as the metadata is in the disk itself. For this reason, the list of all log directory IDs must also be persisted in somewhere that the broker can expected to be able to access. Regardless of how many log directories are configured, one of the log directories will be configured to host the cluster metadata topic. This can either be one of the data log directories or a completely separate one — typically in the system disk. Since the broker will not startup if this log directory is not available, it is the perfect location to persist the list of all log directory identifiers.