This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Released: <Kafka Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The most expensive part of a Kafka cluster is probably its storage system. At LinkedIn we use RAID-10 for storage and set Kafka’s replication factor = 2. This setup requires 4X space to store data and tolerates up to 1 broker failure. We are at risk of data loss with just 1 broker failure, which is not acceptable for e.g. financial data. On the other hand, it is prohibitively expensive to set replication factor = 3 with RAID-10 because it will increase our existing hardware cost and operational cost by 50%.
The solution is to use JBOD and set replication factor = 3 or higher. It is based on the idea that Kafka already has replication across brokers and it is unnecessary to use RAID-10 for replication. Let’s say we set the replication factor = 4 with JBOD. This setup requires 4X space to store data and tolerate up to 3 broker failures in order not to lose any data. In comparison to our existing setup, this allows us to obtain 3X broker failure tolerance without increasing storage hardware cost.
We have evaluated the possibility of using other RAID setup in LinkedIn. But none of them addresses our problem as JBOD does. RAID-0 stops working entirely with just one disk failure. RAID-5 or RAID-6 has sizable performance loss as compared to RAID-0 and probably JBOD as well, due to their use of block-level striping with distributed parity.
Unfortunately, JBOD is not recommended for Kafka because some important features are missing. For example, Kafka lacks good support for tools as well as load balancing across disks when multiple disks are used. Here is a list of problems that need to be addressed for JBOD to be useful:
1) Broker will shutdown if any disk fails. This means a single disk failure can bring down the entire broker. Instead, broker should still serve those replicas on the good disks as long as there is good disk available.
2) Kafka doesn’t provide the necessary tools for users to manage JBOD. For example, Kafka doesn’t provide script to re-assign replicas between disks of the same broker. These tools are needed before we can use JBOD with Kafka.
3) JBOD doesn’t by itself balance load across disks as RAID-10 does. This will be a new problem for us to solve in order for JBOD setup to work well. We should have a better solution than round-robin which we are using to select disk to place a new replica. And we should probably figure out how to re-assign replicas across disks of the same broker if we notice load imbalance across disks of a broker.
For ease of discussion, we have separated the design of JBOD support into two different KIPs. This KIP address the second problem. See KIP - Handle disk failure for JBOD to read our proposal of how to address the first problem.
Since Kafka configuration and implementation does not expose "disk", we will use log directory and disk interchangably in the rest of the KIP.
Goals
The goal of this KIP is to allow administrator to re-assign replicas to the specific log directories of brokers, query offline replicas of topics, and query offline replicas of brokers, and replace bad disks with good disks. This addresses the second problem raised in the motivation section. See KIP - Handle disk failure for JBOD to read our proposal of how to address the first problem.
Proposed change
1) How to move replica between log directories on the same broker
Problem statement:
Kafka doesn’t not allow user to move replica to another log directory on the same broker in runtime. This is not needed previously because we uses RAID-10 and the load is already balanced across disks. But it will be needed to use JBOD with Kafka.
Currently a replica only has two possible states, follower or leader. And it is identified by the 3-tuple (topic, partition, broker). This works for the current implementation because there can be at most one such replica on a broker. However, we will now have two such replicas on a broker when we move replica from one log directory to another log directory on the same broker. Either a replica should be identified by log directory as well, or the broker needs to persist information under the log directory to tell the destination replica from source replica that is being moved.
In addition, user needs to be able to query list of partitions and their size per log directory on any machine so that they can determine how to move replicas to balance the load. While these information may be retrieved via external tools if user can ssh to the machine that is running the Kafka broker, development of such tools may not be easy on all the operating systems that Kafka may run on. Further, a user may not even have the authorization to access the machine. Therefore Kafka needs to provide new request/response to provide this information to users.
Solution:
The idea is that user can send a ChangeReplicaDirRequest which tells broker to move topicPartition.log to a specified log directory. Broker can create a new log of the replica with .move postfix. This allows broker to tell new log of the replica from existing log of the replica in case of fail-over during replica movement. The broker can create new log of the replica on the destination log directory, push data from existing log of the replica to the new log, and replace existing log with the new log for this replica once the new log has caught up.
1. The thread model and broker logic for moving replica data between log directories
When a broker starts, for each topicPartition the broker will check if topicPartition.log and topicPartition.move exists on the disk. If topicPartition.move exists but topicPartition.log doesn't, the broker will rename topicPartition.move to topicPartition.log and appends data to this log. If both topicPartition.log and topicPartition.move exists. The broker will begin to push data from topicPartition.log to topicPartition.move using ReplicaFetcherThread. The topicPartition.log can be replaced by topicPartition.move when topicPartition.move has caught up. In the following we describe how leader and follower move replicas inside the broker.
Broker does the following in order to move a leader replica:
- Java class Replica
will keep track of two instances of Java class Log, one representing topicPartition.log and the other representing topicPartition.move
- Broker creates a new ReplicaFetcherThread to move replicas between its own disks.
- The ReplicaFetcherThread will repeatedly read ByteBufferMessageSet
of size replica.fetch.max.bytes
from those partitions it needs to move from source disks and appends data to destination disks. Note that we will enforce replication quota mechanism introduced in KIP-73 to exclude those partitions that exceed the user-specified replication quota. See below for more detail regarding quota enforcement. If there is no data to move due to quota enforcement, the ReplicaFetcherThread will block until it has work to do.
- The ReplicaFetcherThread
will replace topicPartition.log with topicPartition.move once topicPartition.move has caught up with topicPartition.log. It will delete topicParititon.log on the source disk, rename topicPartition.move to topicParititon.log on the destination disk, and update Replica
instance of this topicPartition
to only include a Log instance that points to the topicPartition.log in the destination disk. The data from leader will be appended to the new topicPartition.log going forward. Note that proper lock is needed to prevent KafkaRequestHandler from appending data to the topicPartition.log on the source disks before this replacement is completed by ReplicaFetcherThread
.
Broker does the following in order to move a follower replica:
- Java class Replica
will keep track of two instances of Java class Log, one representing topicPartition.log and the other representing topicPartition.move
- The ReplicaFetchThread
which is appending data to topicPartition.log will push data from topicPartition.log to topicPartition.move. The maximum wait time of FetchRequset
will be set to 0 ms if the ReplicaFetchThread
needs to move any partition from topicPartition.log to topicPartition.move.
- After ReplicaFetchThread
receives a FetchResponse
, in addition to appending the data from FetchResponse
to local disks, it also reads multiple ByteBufferMessageSet
of size replica.fetch.max.bytes
from partitions it needs to move from source disks and appends data to destination disks. The total size of ByteBufferMessageSet
that can be moved in this step will be limited by replica.fetch.response.max.bytes
. Note that we will enforce replication quota mechanism introduced in KIP-73 to exclude those partitions that exceed the user-specified replication quota. See below for more detail regarding quota enforcement.
- The ReplicaFetcherThread
will replace topicPartition.log with topicPartition.move once topicPartition.move has caught up with topicPartition.log. It will delete topicParititon.log on the source disk, rename topicPartition.move to topicParititon.log on the destination disk, and update Replica
instance of this topicPartition
to only include a Log instance that points to the topicPartition.log in the destination disk. The data from leader will be appended to the new topicPartition.log going forward.
In order to optimize the efficiency of replica movement between disks, ReplicaFetchThread
will always choose partitions in alphabetical order when selecting partitions to move. It means that it will allocate all resource (e.g. quota) to move one partition and complete its movement before moving another partition. This can help us reduce among of double writes during the period that the replica is being moved.
2. The broker logic for handling ChangeReplicaDirRequest
The flow graph below illustrates how broker handles ChangeReplicaDirRequest.
Note that the broker will put ChangeReplicaRequest
in a DelayedOperationPurgatory
. The ChangeReplicaRequest
can be completed when results for all partitions specified in the ChangeReplicaRequest
are available. The result of a partition is determined using the following logic:
- If source or destination disk fails, the result of this partition will be
KafkaStorageException
- If destination replica has caught up with source replica and has replaced source replica, the result of this partition has no error.
3. Throttle replica movement rate
We use the same mechanism introduced in KIP-73 to throttle the rate of replica movement between disks on the same broker. User will need to configure leader.replication.throttled.replicas, follower.replication.throttled.replicas, leader.replication.throttled.rate and follower.replication.throttled.rate in the same way as specified in KIP-73, i.e. through kafka-reassign-partitions.sh or kafka-config.sh. For every message that is moved from source disk to destination disk, the size of the message will be subtracted from both leader replication quota and follower replication quota if its partition is included in the throttled replicas list. No data will be moved for a partition in the *.replication.throttled.replicas if either leader replication quota or follower replication quota is exceed.
4. Query broker for partition assignment and partition size per log directory
When a broker receives DescribeDirsRequest
with empty list of log directories, it will respond with a DescribeDirsResponse
which shows the size of each partition and lists of partitions per log directory for all log directories. If user has specified a list of log directories in the DescribeDirsRequest
, the broker will provide the above information for only log directories specified by the user. Non-zero error code will specified in the DescribeDirsResponse
for each log directory that is either offline or not found by the broker.
2) How to reassign replica between log directories across brokers
Problem statement:
kafka-reassign-partitions.sh should provide the option for user to specify destination log directory of the replica on any broker. And user should be sure that the replica has been moved to the specific log directory after the reassignment is completed. This is needed in order for user to balance load across log directories of brokers in the cluster.
Solution:
The idea is that user should be able to specify log directory when using kafka-reassign-partitions.sh to reassign partition. Controller should be able to read this optional log directory info when reading assignment from zookeeper. Controller should be able to send ChangeReplicaDirRequest and wait for ChangeReplicaDirResponse to confirm the movement to the specific log directory before declaring that this partition has been moved. We describe the procedure in more detail below:
- User can specify a list of log directories, one log directory per replica, for each topic partition in the reassignment json file that is provided to kafka-reassignemnt-partitions.sh. The log directory specified by user must be either "any", or absolute path which begins with '/'. See Scripts
section for the format of this json file.
- kafka-reassignment-partitions.sh will write log directories read from the reassignment json file to the znode /admin/reassign_partitions
. If user doesn't specify log directory, "any" will be used as the default log directory name. See Zookeeper
section for the format of the data in the znode.
- Controller will still update state machine, send LeaderAndIsrRequest and so on to perform partition reassignment. However, it will additionally send ChangeReplicaDirRequest for all replicas that are specified with log directory != "any". The ChangeReplicaDirRequest will move the replica to a specific log directory if it is not already placed there on the broker.
- In addition to the existing requirement of partition reassignment completion, controller will also wait for ChangeReplicaResponse
(corresponding to the ChangeReplicaDirRequest
it has sent) before it considers a movement to be completed and removes a partition from /admin/reassign_partitions
. This allows user to confirm that the reassignment to specific disks of brokers is completed after the partition is removed from znode data of /admin/reassign_partitions
.
Public interface
Zookeeper
Change the format of data stored in znode /admin/reassign_partitions
to allow log directory to be specified for each replica.
{ "version" : int, "partitions" : [ { "topic" : str, "partition" : int, "replicas" : [int], "log_dirs" : [str] <-- NEW. If log directory is not explicitly specified by user, "any" will be used as log directory name and broker will select log directory using its own policy. Currently the log directory is selected in a round-robin manner. }, ... ] }
Protocol
Create ChangeReplicaDirRequest
ChangeReplicaDirRequest => [ReplicaState] ReplicaState => topic => str partition => int32 dir => str
ChangeReplicaDirResponse => error_code partitions error_code => int16 partitions => [ChangeReplicaDirResponsePartition] ChangeReplicaDirResponsePartition => topic partition error_code topic => str partition => int32 error_code => int16
Create DescribeDirsRequest
DescribeDirsRequest => log_dirs log_dirs => [str]
Create DescribeDirsResponse
DescribeDirsResponse => log_dirs log_dirs => [DescribeDirsResponseDirMetadata] DescribeDirsResponseDirMetadata => error_code path topics error_code => int16 path => str topics => [DescribeDirsResponseTopic] DescribeDirsResponseTopic => topic partitions topic => str partitions => [DescribeDirsResponsePartition] DescribeDirsResponsePartition => partition size partition => int32 size => int64
Scripts
1) Add kafka-log-dirs.sh which allows user to get list of replicas per log directory on a broker.
False
. The script output would have the following json format.
{ "version" : 1, "log_dirs" : [ { "live" : boolean, "path" : str, "partitions": [ { "topic" : str, "partition" : int32, "size" : int64 }, ... ] }, ... ] }
{ "version" : int, "partitions" : [ { "topic" : str, "partition" : int, "replicas" : [int], "log_dirs" : [str] <-- NEW. A log directory can be either "any", or a valid absolute path that begins with '/' }, ... ] }
Changes in Operational Procedures
In this section we describe the expected changes in operational procedures in order to run Kafka with JBOD. Administrators of Kafka cluster need to be aware of these changes before switching from RAID-10 to JBOD.
- Need to load balance across log directories
When running Kafka with RAID-10, we only need to take care of load imbalance across brokers. Administrator can balance load across brokers using the script kafka-reassign-partitions.sh. After switching from RAID-10 to JBOD, we will start to see load imbalance across log directories. In order to address this problem, administrator needs to get the partition assignment and their size per log directory using kafka-log-dirs.sh, determines the reassignment of replicas per log directory (as opposed to per broker), and provides partition -> log_directory mapping as input to either kafka-reassign-partitions.sh to execute the new assignment.
Administrator also needs to be prepared that the need to rebalance across log directories will probably be much more frequent than the need to rebalance across brokers since the capacity of individual disk is likely much smaller than the capacity of existing RAID-10 setup.
Compatibility, Deprecation, and Migration Plan
This KIP is a pure addition. So there is no backward compatibility concern.
The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.
Test Plan
The new features will be tested through unit and integration tests.
Rejected Alternatives
Potential Future Improvement
1. Allow controller/user to specify quota when moving replicas between log directories on the same broker.