Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The goal of this KIP is to allow administrator to re-assign replicas to the specific log directories of brokers , query offline replicas of topics, and query offline replicas of brokers, and replace bad disks with good so that they can balance load across disks. This addresses the second problem raised in the motivation section. See KIP - Handle disk failure for JBOD to read our proposal of how to address the first problem.

...

Kafka doesn’t not allow user to move replica to another log directory on the same broker in runtime. This is not needed previously because we uses use RAID-10 and the load is already balanced across disks. But it will be needed to use JBOD with Kafka.

Currently a replica only has two possible states, follower or leader. And it is identified by the 3-tuple (topic, partition, broker). This works for the current implementation because there can be at most one such replica on a broker. However, now we will now can have two such replicas on a broker when we move replica from one log directory to another log directory on the same broker. Either a replica should be identified by its log directory as well, or the broker needs to persist information under the log directory to tell the destination replica from source replica that is being moved.

...

The idea is that user can send a ChangeReplicaDirRequest which tells broker to move topicPartition.log topicPartition directory (which contains all log segments of the topicPartition replica) from the source log directory to a destination log directory. Broker can create a new directory with .move postfix on the destination log directory to hold all log segments of the replica. This allows broker to tell log segments of the replica on the destination log directory from log segments of the replica on the source log directory during broker startup. The broker can create new log segments for the replica on the destination log directory, push data from source log to the destination log, and replace source log with the destination log for this replica once the new log has caught up.

...

Here we describe how a broker moves a Log from source to destination log directory and swaps the Log.  This corresponds to the "Initiate replica data movement" box in the flow graph above. Note that the broker responds to ChangeReplicaDirRequest with MoveInProgress after step 1) described below.

1) The Replica instance is updated to track two instances of Log, one referencing the directory topicPartition on the source log directory and the other referencing the directory topicPartition.move on the destination log directory.
2) If there is thread available in ReplicaMoveThreadPool, one thread gets allocated to move this replica. Otherwise, the movement of this replica gets delayed until there is available thread.
3) The ReplicaMoveThread keeps reading data from the Log in the source log directory to the Log (i.e. toipcPartition.move) in the destination log directory using zero-copy. ReplicaMoveThread may need to sleep to ensure that total throughput in byte-rate used by all ReplicaMoveThread instances does not exceed the configured value of intra.broker.throttled.rate.
4) If the Log in the destination log directory has caught up with the Log in the source log directory, the ReplicaMoveThread grabs lock on the Replica instance.
5) The ReplicaMoveThread continues to move data as specified in step 3) until the Log in the destination log directory has caught up with the Log in the source log directory. We need to do this again because RequestHandlerThread or ReplicaFetcherThread may be appending data to the log in the source log directory concurrently after the check in step 4).
6) The ReplicaMoveThread renames directory topicPartition to topicPartition.delete on the source log directory. topicParition.delete will be subject to asynchronous delete.
7) The ReplicaMoveThread renames directory topicParition.move to topicParition on the destination log directory 
8) The ReplicaMoveThread updates the corresponding Replica instance to track only the Log in the destination log directory.
9) The ReplicaMoveThread releases lock on the Replica instance.
 

...

- If both the directory topicPartition and the directory topicPartition.move exist on good log directories, broker will start ReplicaFetcherThread to ReplicaMoveThread to copy data from topicPartition to topicPartition.move. The effect is the same as if broker has received ChangeReplicaDirRequest to move replica from topicPartition to topicPartition.move.
- If topicPartition.move exists but topicPartition doesn't exist on any good log directory, and if there is no bad log directory, then broker renames topicPartition.move to topicPartition.
- If topicPartition.move exists but topicPartition doesn't exist on any good log directory, and if there is bad log directory, then broker considers topicPartition as offline and would not touch topicPartition.move.
- If topicPartition.delete exists, the broker schedules topicParition.delete for asynchronous delete.

2) How to reassign replica between log directories across brokers

Problem statement:

kafka-reassign-partitions.sh should provide the an option for user to specify destination log directory of the replica on any brokerin the reassignment json file. And user should be able to verify that the replica has been moved to the specific specified log directory after the reassignment is completed. This is needed by user to balance load across log directories of brokers in the cluster.

...

The idea is that user should be able to specify log directory when using kafka-reassign-partitions.sh to reassign partition. If user has specified log directory on the destination broker, the script should send ChangeReplicaDirRequest directly to the broker so that broker can either start replica movement or mark the replica to be created on the destination log directory. start ReplicaMoveThread to move the replica. Finally, the script should send DescribeDirsRequest to broker to verify that the replica has been created/moved in the specified log directory when user requests to verify the assignment.

...

- User specifies a list of log directories, one log directory per replica, for each topic partition in the reassignment json file that is provided to kafka-reassignemnt-partitions.shThe log directory specified by user must be either "any", or absolute path which begins with '/'. See Scripts section for the format of this json file.
- In addition to creating znode at /admin/reassign_partitions with the replica assignment, the script will also send ChangeReplicaDirRequest to the leader brokers of partitions for which the those replicas whose log directory path in the assignment is not "any". The script needs to retry up to a configured amount of time if ChangeReplicaDirResponse shows ReplicaNotAvailableException. This is needed to wait for controller to send LeaderAndIsrRequest to the broker if replica hasnto create the replica if the broker isn't already been created therea leader or follower of the partition.
- Broker handles ChangeReplicaDirRequest as specified in the section "How to move replica between log directories on the same broker".

Here are the steps to verify partition assignment:

kafka-reassignemnt-partitions.sh will verify partition assignment across brokers as it does now. 
- For those partitions replicas with destination log directory != "any", kafka-reassignemnt-partitions.sh groups those partitions replicas according to their leader brokers and and sends DescribeDirsRequest sends DescribeDirsRequest to those brokers. The DescribeDirsRequest should provide the log directories and partitions specified in the expected assignment.
- Broker replies with DescribeDirsResponse which shows the current log directory for each partition specified in the DescribeDirsRequest.
- kafka-reassignemnt-partitions.sh determines whether the replica has been moved to the specified log directory based on the DescribeDirsResponse.

3) How to retrieve information to determine the new replica assignment across log directories

...

In order to optimize replica assignment across log directories, user would need to figure out the list partitions per log directory, the size of each partition. As of now Kafka doesn't expose this information via any RPC and user would need to either query the JMX metrics of the broker, or use external tools to log onto each machine to get this informationget this information. While it is possible to retrieve these information via JMX, users would have to manage JMX port and related credentials. It is better if Kafka can expose this information via RPC.

...

We introduce DescribeDirsRequest and DescribeDirsResponseWhen a broker receives DescribeDirsRequest with empty list of log directories, it will respond with a DescribeDirsResponse which shows the size of each partition and lists of partitions per log directory for all log directories. If user has specified a list of log directories in the DescribeDirsRequest, the broker will provide the above information for only log directories specified by the user. If user . Nonhas specified an empty list of topics in the DescribeDirsRequest, all topics will be queried and included in the response. Otherwise, only those topics specified in the DescribeDirsRequest will be queried. Non-zero error code will be specified in the DescribeDirsResponse for each log directory that is either offline or not found by the broker.

...

When running Kafka with RAID-10, we only need to take care of load imbalance across brokers. Administrator can balance load across brokers using the script kafka-reassign-partitions.sh. After switching from RAID-10 to JBOD, we will start to see load imbalance across log directories. In order to address this problem, administrator needs to get the partition assignment and their size per log directory using kafka-log-dirs.sh, determines determine the reassignment of replicas per log directory (as opposed to per broker), and provides partition -> provide partition to (broker, log_directory) mapping as input to either to kafka-reassign-partitions.sh to execute the new assignment.

...