Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The most expensive part of a Kafka cluster is probably its storage system. At LinkedIn we use RAID-10 for storage and set Kafka’s replication factor = 2. This setup requires 4X space to store data and tolerates up to 1 broker failure. We are at risk of data loss with just 1 broker failure, which is not acceptable for e.g. financial data. On the other hand, it is prohibitively expensive to set replication factor = 3 with RAID-10 because it will increase our existing hardware cost and operational cost by 50%.

...

 

The goal of this KIP is to allow broker to serve replicas on good log directories even if some log directories have failed. This addresses the first problem raised in the motivation section. See KIP - Support replicas movement between log directories  to read our proposal of how to address the second problem.

Proposed change

How to handle log directory failure

Problem statement:

Currently LeaderAndIsrRequest is used for two purpose: 1) create a new replica on a broker and 2) switch a replica between leader/follower of the partition. If a broker starts with some replicas unavailable because they are on a bad log directory, it will re-create those replicas on a good log directory when it receives LeaderAndIsrRequest from the controller. This is wrong. To avoid this, controller needs to know whether the replica has been created on the broker and explicitly specify whether broker should create replica in the LeaderAndIsrRequest.

...

- Kafka client can send MetadataRequest to any broker to query offline replicas for the specified partitions.
- kafka-topics script will display offline replicas when describing a topic partition. The offline replicas is the union of offline replicas on live brokers and replicas on dead brokers. kafka-topics script obtains offline replicas by sending MetadataRequest to any broker.
 

Public interface

Zookeeper

1) Add znode at /broker/topics/[topic]/partitions/[partitionId]/controller_managed_state with the following data format:

Code Block
{
  "version" : int,
  "created" : [int]
}

2) Store data with the following json format in znode /log_dir_event_notification/log_dir_event_*

Code Block
{
  "version" : int,
  "broker" : int,
  "event" : int    <-- This can be LogDirFailure or LogDirReset
}

Protocol

Add a create field to LeaderAndIsrRequestPartitionState which will be used by LeaderAndIsrRequest 

Code Block
LeaderAndIsrRequest => controller_id controller_epoch partition_states live_leaders
  controller_id => int32
  controller_epoch => int32
  partition_states => [LeaderAndIsrRequestPartitionState]
  live_leaders => [LeaderAndIsrRequestLiveLeader]
 
LeaderAndIsrRequestPartitionState => topic partition controller_epoch leader leader_epoch isr zk_version replicas
  topic => str
  partition => int32
  controller_epoch => int32
  leader => int32
  leader_epoch => int32
  isr => [int32]
  zk_version => int32
  replicas => [int32]
  create => boolean <-- NEW

Add a offline_replicas field to UpdateMetadataRequestPartitionState which will be used by UpdateMetadataRequest

Code Block
UpdateMetadataRequest => controller_id controller_epoch partition_states live_brokers
  controller_id => int32
  controller_epoch => int32
  partition_states => [UpdateMetadataRequestPartitionState]
  live_brokers => [UpdateMetadataRequestBroker]
 
UpdateMetadataRequestPartitionState => topic partition controller_epoch leader leader_epoch isr zk_version replicas offline_replicas
  topic => string
  partition => int32
  controller_epoch => int32
  leader => int32
  leader_epoch => int32
  isr => [int32]
  zk_version => int32
  replicas => [int32]
  offline_replicas => [int32]  <-- NEW

Add a offline_replicas field to PartitionMetadata which will be used by MetadataResponse

Code Block
MetadataResponse => brokers cluster_id controller_id topic_metadata
  brokers => [MetadatBroker]
  cluster_id => nullable_str
  controller_id => int32
  topic_metadata => TopicMetadata
 
TopicMetadata => topic_error_code topic is_internal partition_metadata
  topic_error_code => int16
  topic => str
  is_internal => boolean
  partition_metadata => [PartitionMetadata]
 
PartitionMetadata => partition_error_code partition_id leader replicas isr offline_replicas
  partition_error_code => int16
  partition_id => int32
  leader => int32
  replicas => [int32]
  isr => [int32]
  offline_replicas => [int32]  <-- NEW

Scripts

1) When describing a topic, kafka-topics.sh will show the offline replicas for each partition.

2) Add kafka-log-dirs.sh which allows user to reset log directory state of a broker. 

./bin/kafka-log-dirs.sh --delete --zookeeper localhost:2181 --broker 1 will delete this broker from list “created” of all znode /broker/topics/[topic]/partitions/[partitionId]/state

Changes in Operational Procedures

In this section we describe the expected changes in operational procedures in order to switch Kafka to run with JBOD instead of RAID. Administrators of Kafka cluster need to be aware of these changes before switching from RAID-10 to JBOD.

1) Need to reset broker's log directory state after disk is fixed.

As of current Kafka implementation, a broker will automatically create replica on a good log directory if it receives LeaderAndIsrRequest for a partition. No extra operation is needed e.g. if a broker is restarted after replacing failed disks with empty disks, or if broker is restarted on a new machine with empty disks, because the broker will automatically populate good log directories with data read from the partition leader. But after this KIP, administrator needs to explicitly execute the script kafka-log-dirs.sh to remove this broker from the list "created" of all partitions and restart broker after the hardware problem is fixed. This is needed to ensure that those replicas can be re-created on the good log directories of the broker after administrator has fixed the problem.

2) Need to adjust replication factor and min.insync.replicas

After we switch from RAID-10 to JBOD, the number of disks that can fail will be smaller if replication factor is not changed. Administrator needs to change replication factor and min.insync.replicas to balance the cost, availability and performance of Kafka cluster. With proper configuration of these two configs, we can have reduced disk cost or increased tolerance of broker failure and disk failure. Here are a few examples:

 - If we switch from RAID-10 to JBOD and keep replication factor to 2, the disk usage of Kafka cluster would be reduced by 50% without reducing the availability against broker failure. But tolerance of disk failure will decrease.
- If we switch from RAID-10 to JBOD and increase replication factor from 2 to 3, the disk usage of Kafka cluster would be reduced by 25%, the number of brokers that can fail without impacting availability can increase from 1 to 2. But tolerance of disk failure will still decrease.
- If we switch from RAID-10 to JBOD and increase replication factor from 2 to 4, the disk usage of Kafka would stay the same, the number of brokers that can fail without impacting availability can increase from 1 to 3, and number of disks that can fail without impacting availability would stay the same.

Compatibility, Deprecation, and Migration Plan

This KIP is a pure addition. So there is no backward compatibility concern.

The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.

Test Plan

The new features will be tested through unit, integration, and system tests. In the following we explain the system tests only. 

 Note that we validate the following when we say "validate client/cluster state" in the system tests.

 - Brokers are all running and show expected error message
 - topic description shows expected results for all topics
 - kafka-log-dirs.sh can show the expected offline replicas for all brokers
 - A pair of producer and consumer can succcessfully produce/consume from a topic without message loss or duplication.

1) Log directory failure on leader during bootstrap

- Start 1 zookeeper and 3 brokers
- Create a topic of 1 partition with 3 replicas
- Start a pair of producer and consumer to produce/consume from the topic
- Kill the leader of the partition
- Remove the first log directory of the leader and create a file with the same path
- Start leader again
- Validated client/cluster state

2) Log directory failure on leader during runtime

 - Start 1 zookeeper and 3 brokers
- Create a topic of 1 partition with 3 replicas
- Start a pair of producer and consumer to produce/consume from the topic
- Remove the first log directory of the leader and create a file with the same path
- Validated client/cluster state
// Now validate that the previous leader can still serve replicas on the good log directories
- Create another topic of 1 partition with 3 replicas
- Kill the other two brokers
Start a pair of producer and consumer to produce/consume from the new topic
- Validated client/cluster state

3) Log directory failure on follower during runtime

- Start 1 zookeeper and 3 brokers
- Create a topic of 1 partition with 3 replicas
Start a pair of producer and consumer to produce/consume from the topic
- Remove the first log directory of a follower
- Validated client/cluster state
// Now validate that the follower can still serve replicas on the good log directories
- Create another topic of 1 partition with 3 replicas
- Kill the other two brokers
Start a pair of producer and consumer to produce/consume from the new topic
- Validated client/cluster state

Rejected Alternatives

- Let broker keep track of the replicas that it has created.

 The cons of this approach is that each broker, instead of controller, keeps track of the replica placement information. However, this solution will split the task of determining offline replicas among controller and brokers as opposed to the current Kafka design, where the controller determines states of replicas and propagate this information to brokers. We think it is less error-prone to still let controller be the only entity that maintains metadata (e.g. replica state) of Kafka cluster.


- Avoid adding "create" field to LeaderAndIsrRequest.
 If we don't add "create" field to LeaderAndIsrRequest, then broker will need to keep track of the list of replicas it has created and persists this information in either local disks or zookeeper.

Add a new field "created" in the existing znode /broker/topics/[topic]/partitions/[partitionId]/state instead of creating a new znode
If we don't include list of created replicas in the LeaderAndIsrRequset, the leader would need to read this list of created replicas from zookeeper before updating isr in the zookeeper. This is different from the current design where all information except isr are read from LeaderAndIsrRequest from controller. And it creates opportunity for race condition. Thus we propose to add a new znode to keep those information that can only be written by controller.

 

- Identify replica by 4-tuple (topic, partition, broker, log_directory) in zookeeper and various requests

This would require big change to both wire protocol and znode data format in order to specify log directory for every replica. And it requires Kafka to keep track of log directory of replica and update information in zookeeper every time a replica is moved between log directories on the same broker for load-balance purpose. We would like to avoid the additional code complexity and performance overhead.

Potential Future Improvement

1. Distribute segments of a given replica across multiple log directories on the same broker. It is useful but complicated. It is something that can be done later via a separate KIP.
2. Provide intelligent solution to select log directory to place new replicas and re-assign replicas across log directories to balance the load.
3. Have broker automatically rebalance replicas across its log directories. It is worth exploring separately in a future KIP as there are a few options in the design space.
4. Allow controller/user to specify quota when moving replicas between log directories on the same broker.