...
1. Initiate replica movement using ChangeReplicaDirRequest
User uses kafka-reassignemntreassignment-partitions.sh to send ChangeReplicaDirRequest to broker to initiate replica movement between its log directories. The flow graph below illustrates how broker handles ChangeReplicaDirRequest
.
...
- User specifies a list of log directories, one log directory per replica, for each topic partition in the reassignment json file that is provided to kafka-reassignemntreassignment-partitions.sh. The log directory specified by user must be either "any", or absolute path which begins with '/'. See Scripts
section for the format of this json file.
- In addition to creating znode at /admin/reassign_partitions with the replica assignment, the script will also send ChangeReplicaDirRequest
to the brokers of those replicas whose log directory path in the assignment is not "any". The script needs to retry up to a configured amount of time if ChangeReplicaDirResponse
shows ReplicaNotAvailableException
. This is needed to wait for controller to send LeaderAndIsrRequest
to the broker to create the replica if the broker isn't already a leader or follower of the partition.
- Broker handles ChangeReplicaDirRequest
as specified in the section "How to move replica between log directories on the same broker".
Here are the steps to verify partition assignment:
- kafka-reassignemntreassignment-partitions.sh will verify partition assignment across brokers as it does now.
- For those replicas with destination log directory != "any", kafka-reassignemntreassignment-partitions.sh groups those replicas according to their brokers and and sends DescribeDirsRequest
to those brokers. The DescribeDirsRequest
should provide the log directories and partitions specified in the expected assignment.
- Broker replies with DescribeDirsResponse
which shows the current log directory for each partition specified in the DescribeDirsRequest
.
- kafka-reassignemntreassignment-partitions.sh determines whether the replica has been moved to the specified log directory based on the DescribeDirsResponse
.
...
Code Block |
---|
ChangeReplicaDirResponse => topics topics => [ChangeReplicaDirResponseTopic] ChangeReplicaDirResponseTopic => topic partitions topic => str partitions => [ChangeReplicaDirResponsePartition] ChangeReplicaDirResponsePartition => partition error_code partition => int32 error_code => int16 |
Create DescribeDirsRequest
Code Block |
---|
// log_dirs and topics are used to filter the results to include only the specified log_dir/topic. The result is the intersection of both filters.
DescribeDirsRequest => log_dirs topics
log_dirs => [str] // If this is null, then all log directories will be queried
topics => [str] // If this is null, all topics will be queried |
Create DescribeDirsResponse
Code Block |
---|
DescribeDirsResponse => log_dirs log_dirs => [DescribeDirsResponseDirMetadata] DescribeDirsResponseDirMetadata => error_code path topics error_code => int16 path => str topics => [DescribeDirsResponseTopic] DescribeDirsResponseTopic => topic partitions topic => str partitions => [DescribeDirsResponsePartition] DescribeDirsResponsePartition => partition size log_end_offset is_temporary partition => int32 size => int64 log_end_offset => int64 is_temporary => boolean // True if replica is *.move |
Broker Config
1) Add config intra.broker.throttled.rate
. This config specified the maximum rate in bytes-per-second that can be used to move replica between log directories. This config defaults to MAX_LONG.
2) Add config num.replica.move.threads
. This config specified the numbe of threads in ReplicaMoveThreadPool
. The thread in this thread pool is responsible to moving replica between log directories. This config defaults to the number of log directories.
...
Code Block |
---|
{ "version" : 1, "log_dirs" : [ { "is_live" : boolean, "path" : str, "partitions": [ { "topic" : str, "partition" : int32, "size" : int64, "is_temporary" : boolean }, ... ] }, ... ] } |
Code Block |
---|
{ "version" : int, "partitions" : [ { "topic" : str, "partition" : int, "replicas" : [int], "log_dirs" : [str] <-- NEW. A log directory can be either "any", or a valid absolute path that begins with '/' }, ... ] } |
Note: the quota specified by the argument `–throttle` will be applied to only inter-broker replica reassignment. It does not affect the quota for replica movement between log directories.
3) Add optional argument --timeout to kafka-reassignemntreassignment-partitions.sh. This is because kafka-reassignemntreassignment-partitions.sh may need to re-send ChangeReplicaDirRequest
to broker if replica hasn't already been created there. The timeout is set to 10 seconds by default.
...