...
The idea is for controller to keep track of weather a replica has been successfully created by getting this information from LeaderAndIsrResponse
and persisting this information in the zookeeper. Then controller can explicitly tell broker whether to create replica or not by specifying a newly-added boolean field "isNewReplica" in the LeaderAndIsrRequest. This field will be true only when a replica is transitioning from the NewReplica state to Online state. Each broker can tell controller whether a replica is created and online by specifying the error_code per partition in the LeaderAndIsrResponse, which in turn allows broker to derive the offline replicas per partition and elect leader appropriately.
...
- The controller creates znode /broker/topics/[topic]/partitions/[partitionId]/controller_managed_state
with json-formatted data {"version" : 1, "created" : []} for this partition.
- The controller sends LeaderAndIsrRequest with createisNewReplica=True to the leader and followers.
- The leader and followers create replicas locally and sends LeaderAndIsrReponse to the controller with error=None.
- After receiving LeaderAndIsrResponse from leader and followers of this partition, the controller adds broker to the list "created" of this partition if controller considers the replica as successfully created if error=None in LeaderAndIsrResponse. Otherwise, the replica is considered offline.
...
- Broker collects list of replicas found on the good log directories. If there is no good log directory the broker will exit.
- The controller sends LeaderAndIsrRequest for all partitions that should exist on this broker. If a replica is not in the created list of a partition, its createtransitioning from NewReplica state to Online state, isNewReplica=True in the LeaderAndIsrRequest. Otherwise, its createisNewReplica=False.
- The broker will specify error=KafkaStorageException for those partitions that are in the LeaderAndIsrRequest with create isNewReplica=False but not found on any good log directory. The broker will create replica on a good log directory if the replica is not found on any good log directory and its isNewReplica=True.
- The controller considers a replica on that broker to be offline if its error!=None in the LeaderAndIsrResponse.
- The controller updates the information of offline replicas in memory and triggers leader election as appropriate.
- The controller removes offline replicas from ISR in the ZK and sends LeaderAndIsrRequest with updated ISR to be used by partition leaders.
- The controller propagates the information of offline replicas to brokers by sending UpdateMetadataRequest.
...
- The controller watches the path /log_dir_event_notification
for new znode.
- User can either replace a bad disk with good disk, or remove the bad log directory from broker config.
- User restarts broker with only good log directories. Broker can read all log directories specified in its config.
- The controller sends LeaderAndIsrRequest with createisNewReplica=False to this broker because this replica has all replicas have been created according to zookeeperon this broker.
- Broker will create replica if not found on a any good disk log directory because it can access all log directories specified in the config.
...
- Controller propagates the information of offline replicas to brokers by sending UpdateMetadataRequest. MetadataResponse will include offline replicas per partition.
- kafka-topics script will display offline replicas when describing a topic partition. The offline replicas is the union of offline replicas on live brokers and replicas on dead brokers. kafka-topics script obtains offline replicas by sending MetadataRequest to any broker.
...
- Controller derives offline replicas from LeaderAndIsrResponse and make leader election among live replicas.
- Controller will remove replica from created list of this partition in zookeeper if StopReplicaResponse indicates that this replica is successfully deleted.
- Controller will follow the follow the same procedure specified in steps of "Topic gets created" when it creates replica on destination brokers, i.e. it will specify proper value for "createisNewReplica" in LeaderAndIsrRequest and manage created list of this partition in zookeeper.
Public interface
Zookeeper
1
Public interface
Zookeeper
1) Add znode at /broker/topics/[topic]/partitions/[partitionId]/controller_managed_state
with the following data format:
Code Block |
---|
{
"version" : int,
"created" : [int]
} |
These znodes will only be updated by controller. It will be updated in the follow scenario:
- When a replica is successfully created on the broker with LeaderAndIsrRequest(create=True), controller adds this replica (i.e. broker id) to the created list of this partition
- When a replica is successfully deleted on the broker with StopReplicaRequest(delete=True), controller deletes this replica from the created list of this partition.
2) Store data with the following json format in znode /log_dir_event_notification/log_dir_event_*
Code Block |
---|
{ "version" : int, "broker" : int, "event" : int <-- This can be LogDirFailure } |
Protocol
Add a create isNewReplica field to LeaderAndIsrRequestPartitionState which will be used by LeaderAndIsrRequest
Code Block |
---|
LeaderAndIsrRequest => controller_id controller_epoch partition_states live_leaders controller_id => int32 controller_epoch => int32 partition_states => [LeaderAndIsrRequestPartitionState] live_leaders => [LeaderAndIsrRequestLiveLeader] LeaderAndIsrRequestPartitionState => topic partition controller_epoch leader leader_epoch isr zk_version replicas topic => str partition => int32 controller_epoch => int32 leader => int32 leader_epoch => int32 isr => [int32] zk_version => int32 replicas => [int32] createisNewReplica => boolean <-- NEW |
Add a offline_replicas field to UpdateMetadataRequestPartitionState which will be used by UpdateMetadataRequest
Code Block |
---|
UpdateMetadataRequest => controller_id controller_epoch partition_states live_brokers controller_id => int32 controller_epoch => int32 partition_states => [UpdateMetadataRequestPartitionState] live_brokers => [UpdateMetadataRequestBroker] UpdateMetadataRequestPartitionState => topic partition controller_epoch leader leader_epoch isr zk_version replicas offline_replicas topic => string partition => int32 controller_epoch => int32 leader => int32 leader_epoch => int32 isr => [int32] zk_version => int32 replicas => [int32] offline_replicas => [int32] <-- NEW. This includes offline replicas due to both broker failure and disk failure. |
Add a offline_replicas field to PartitionMetadata which will be used by MetadataResponse
Code Block |
---|
MetadataResponse => brokers cluster_id controller_id topic_metadata brokers => [MetadatBroker] cluster_id => nullable_str controller_id => int32 topic_metadata => TopicMetadata TopicMetadata => topic_error_code topic is_internal partition_metadata topic_error_code => int16 topic => str is_internal => boolean partition_metadata => [PartitionMetadata] PartitionMetadata => partition_error_code partition_id leader replicas isr offline_replicas partition_error_code => int16 partition_id => int32 leader => int32 replicas => [int32] isr => [int32] offline_replicas => [int32] <-- NEW. This includes offline replicas due to both broker failure and disk failure. |
Scripts
1) When describing a topic, kafka-topics.sh will show the offline replicas for each partition.
Metrics
Here are the metrics we need to add as part of this proposal
1) OfflineReplicasCount: The number of offline replicas on a live broker. This is equivalent to the number of TopicParition log on the bad log directories of the broker.
2) OfflineLogDirectoriesCount: The number of offline log directories on a live broker.
Changes in Operational Procedures
In this section we describe the expected changes in operational procedures in order to switch Kafka to run with JBOD instead of RAID. Administrators of Kafka cluster need to be aware of these changes before switching from RAID-10 to JBOD.
1) Need to adjust replication factor and min.insync.replicas
After we switch from RAID-10 to JBOD, the number of disks that can fail will be smaller if replication factor is not changed. Administrator needs to change replication factor and min.insync.replicas to balance the cost, availability and performance of Kafka cluster. With proper configuration of these two configs, we can have reduced disk cost or increased tolerance of broker failure and disk failure. Here are a few examples:
- If we switch from RAID-10 to JBOD and keep replication factor to 2, the disk usage of Kafka cluster would be reduced by 50% without reducing the availability against broker failure. But tolerance of disk failure will decrease.
- If we switch from RAID-10 to JBOD and increase replication factor from 2 to 3, the disk usage of Kafka cluster would be reduced by 25%, the number of brokers that can fail without impacting availability can increase from 1 to 2. But tolerance of disk failure will still decrease.
- If we switch from RAID-10 to JBOD and increase replication factor from 2 to 4, the disk usage of Kafka would stay the same, the number of brokers that can fail without impacting availability can increase from 1 to 3, and number of disks that can fail without impacting availability would stay the same.
Compatibility, Deprecation, and Migration Plan
This KIP is a pure addition. So there is no backward compatibility concern.
The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.
Test Plan
The new features will be tested through unit, integration, and system tests. In the following we explain the system tests only.
Note that we validate the following when we say "validate client/cluster state" in the system tests.
- Brokers are all running and show expected error message
- topic description shows expected results for all topics
- A pair of producer and consumer can succcessfully produce/consume from a topic without message loss or duplication.
1) Log directory failure on leader during bootstrap
- Start 1 zookeeper and 3 brokers
- Create a topic of 1 partition with 3 replicas
- Start a pair of producer and consumer to produce/consume from the topic
- Kill the leader of the partition
- Change permission of the first log direcotry of the leader to be 000
- Start leader again
- Validated client/cluster state
2) Log directory failure on leader during runtime
- Start 1 zookeeper and 3 brokers
- Create a topic of 1 partition with 3 replicas
- Start a pair of producer and consumer to produce/consume from the topic
- Change permission of the leader's log direcotry to be 000
- Validated client/cluster state
// Now validate that the previous leader can still serve replicas on the good log directories
- Create another topic of 1 partition with 3 replicas
- Kill the other two brokers
- Start a pair of producer and consumer to produce/consume from the new topic
- Validated client/cluster state
3) Log directory failure on follower during runtime
- Start 1 zookeeper and 3 brokers
- Create a topic of 1 partition with 3 replicas
- Start a pair of producer and consumer to produce/consume from the topic
- Change permission of the follower's log direcotry to be 000
- Validated client/cluster state
// Now validate that the follower can still serve replicas on the good log directories
- Create another topic of 1 partition with 3 replicas
- Kill the other two brokers
- Start a pair of producer and consumer to produce/consume from the new topic
- Validated client/cluster state
Rejected Alternatives
- Let broker keep track of the replicas that it has created.
The cons of this approach is that each broker, instead of controller, keeps track of the replica placement information. However, this solution will split the task of determining offline replicas among controller and brokers as opposed to the current Kafka design, where the controller determines states of replicas and propagate this information to brokers. We think it is less error-prone to still let controller be the only entity that maintains metadata (e.g. replica state) of Kafka cluster.
- Avoid adding "create" field to LeaderAndIsrRequest.
- Add a new field "created" in the existing znode
/broker/topics/[topic]/partitions/[partitionId]/state
instead of creating a new znodeLeaderAndIsrRequset
, the leader would need to read this list of created replicas from zookeeper before updating isr in the zookeeper. This is different from the current design where all information except isr are read from LeaderAndIsrRequest from controller. And it creates opportunity for race condition. Thus we propose to add a new znode to keep those information that can only be written by controller.
- Identify replica by 4-tuple (topic, partition, broker, log_directory) in zookeeper and various requests
1) It seems if we were to tell kafka user to deploy 50 brokers on a machine of 50 disks. The overhead of managing so many brokers' config would also increase.
2) Either when user deploys Kafka on a commercial cloud platform or when user deploys their own cluster, the size or largest disk is usually limited. There will be scenarios where user want to increase broker capacity by having multiple disks per broker. This JBOD KIP makes it feasible without hurting availability due to single disk failure.
3) There is performance concern when you deploy 10 broker vs. 1 broker on one machine. The metadata the cluster, including FetchRequest, ProduceResponse, MetadataRequest and so on will all be 10X more. The packet-per-second will be 10X higher which may limit performance if pps is the performance bottleneck. The number of socket on the machine is 10X higher. And the number of replication thread will be 100X more. The impact will be more significant with increasing number of disks per machine. Thus it will limit Kakfa's scalability in the long term. Our stress test result shows that one-broker-per-disk has 15% lower throughput.
4) Less efficient way to manage quota.
5) Rebalance between disks/brokers on the same machine will be less efficient and less flexible. Broker has to read data from another broker on the same machine via socket. It is also harder to do automatic load balance between disks on the same machine in the future.
Potential Future Improvement
1. Distribute segments of a given replica across multiple log directories on the same broker. It is useful but complicated. It is something that can be done later via a separate KIP.
2. Provide intelligent solution to select log directory to place new replicas and re-assign replicas across log directories to balance the load.
3. Have broker automatically rebalance replicas across its log directories. It is worth exploring separately in a future KIP as there are a few options in the design space.
4. Allow controller/user to specify quota when moving replicas between log directories on the same broker.