Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The flow graph below illustrates how broker handles ChangeReplicaDirRequest.

View file
nameJBOD-flowgraph.pdf
height250

3. Throttle replica movement rate

We use the same mechanism introduced in KIP-73 to throttle the rate of replica movement between disks on the same broker. User will need to configure leader.replication.throttled.replicas, follower.replication.throttled.replicas, leader.replication.throttled.rate and follower.replication.throttled.rate in the same way as specified in KIP-73, i.e. through kafka-reassign-partitions.sh or kafka-config.sh. For every message that is moved from source disk to destination disk, the size of the message will be subtracted from both leader replication quota and follower replication quota if its partition is included in the throttled replicas list. No data will be moved for a partition in the *.replication.throttled.replicas if either leader replication quota or follower replication quota is exceed.

 

4. Query broker for partition assignment and partition size per log directory

When a broker receives DescribeDirsRequest with empty list of log directories, it will respond with a DescribeDirsResponse which shows the size of each partition and lists of partitions per log directory for all log directories. If user has specified a list of log directories in the DescribeDirsRequest, the broker will provide the above information for only log directories specified by the user. Non-zero error code will specified in the DescribeDirsResponse for each log directory that is either offline or not found by the broker.

User can use command such as ./bin/kafka-log-dirs.sh --describe --zookeeper localhost:2181 --broker 1 to get the above information per log directory.

2) How to reassign replica to a specific log directory on any broker

Problem statement:

kafka-reassign-partitions.sh should provide the option for user to specify destination log directory of the replica on any broker. This is needed in order for user to balance load across log directories of brokers in the cluster.

Solution:

- User can specify a list of log directories, one log directory per replica, for each topic partition in the reassignment json file that is provided to kafka-reassignemnt-partitions.shThe log directory specified by user must be either "any", or absolute path which begins with '/'. See Scripts section for the format of this json file.

- kafka-reassignment-partitions.sh will write log directories read from the reassignment json file to the znode /admin/reassign_partitions.  If user doesn't specify log directory, "any" will be used as the default log directory name. See Zookeeper section for the format of the data in the znode.

Note that the broker will put ChangeReplicaRequest in a DelayedOperationPurgatory. The ChangeReplicaRequest can be completed when results for all partitions specified in the ChangeReplicaRequest are available. The result of a partition is determined using the following logic:

 

  • If source or destination disk fails, the result of this partition will be KafkaStorageException
  • If destination replica has caught up with source replica and has replaced source replica, the result of this partition has no error.

3. Throttle replica movement rate

We use the same mechanism introduced in KIP-73 to throttle the rate of replica movement between disks on the same broker. User will need to configure leader.replication.throttled.replicas, follower.replication.throttled.replicas, leader.replication.throttled.rate and follower.replication.throttled.rate in the same way as specified in KIP-73, i.e. through kafka-reassign-partitions.sh or kafka-config.sh. For every message that is moved from source disk to destination disk, the size of the message will be subtracted from both leader replication quota and follower replication quota if its partition is included in the throttled replicas list. No data will be moved for a partition in the *.replication.throttled.replicas if either leader replication quota or follower replication quota is exceed.

 

4. Query broker for partition assignment and partition size per log directory

When a broker receives DescribeDirsRequest with empty list of log directories, it will respond with a DescribeDirsResponse which shows the size of each partition and lists of partitions per log directory for all log directories. If user has specified a list of log directories in the DescribeDirsRequest, the broker will provide the above information for only log directories specified by the user. Non-zero error code will specified in the DescribeDirsResponse for each log directory that is either offline or not found by the broker.


User can use command such as ./bin/kafka-log-dirs.sh --describe --zookeeper localhost:2181 --broker 1 to get the above information per log directory.

2) How to reassign replica to a specific log directory on any broker

Problem statement:

kafka-reassign-partitions.sh should provide the option for user to specify destination log directory of the replica on any broker. This is needed in order for user to balance load across log directories of brokers in the cluster.

Solution:

- User can specify a list of log directories, one log directory per replica, for each topic partition in the reassignment json file that is provided to kafka-reassignemnt-partitions.shThe log directory specified by user must be either "any", or absolute path which begins with '/'. See Scripts section for the format of this json file.

- kafka-reassignment-partitions.sh will write log directories read from the reassignment json file to the znode /admin/reassign_partitions.  If user doesn't specify log directory, "any" will be used as the default log directory name. See Zookeeper section for the format of the data in the znode.

- Controller will still update state machine, send LeaderAndIsrRequest and so on to perform partition reassignment. However, it will additionally send ChangeReplicaDirRequest for all replicas that are specified with log directory != "any". The ChangeReplicaDirRequest will move the replica to a specific log directory if it is not already placed there on the broker.

- In addition to the existing requirement of partition reassignment completion, controller will also wait for ChangeReplicaResponse (corresponding to the ChangeReplicaDirRequest it has sent) before it considers a movement to be completed and removes a partition from /admin/reassign_partitions. This allows user to confirm that the reassignment to specific disks of brokers is completed after the partition is removed from znode data of /admin/reassign_partitions- Controller will still update state machine, send LeaderAndIsrRequest and so on to perform partition reassignment. However, it will additionally send ChangeReplicaDirRequest for all replicas that are specified with log directory != "any". The ChangeReplicaDirRequest will move the replica to a specific log directory if it is not already placed there on the broker.

Public interface

Zookeeper

...