...
- User specifies a list of log directories, one log directory per replica, for each topic partition in the reassignment json file that is provided to kafka-reassignment-partitions.sh. The log directory specified by user must be either "any", or absolute path which begins with '/'. If "any" is specified as the log directory, the broker is free to choose any log directory to place the replica. Current broker implementation will select log directory using round-robin algorithm by default. See Scripts
section for the format of this json file.
- The script sends ChangeReplicaDirRequest to those brokers which need to move replicas to user-specified log directory. This step can be skipped if user has specified "any" as log directory for all replicas. The script exits with error if the broker to receive ChangeReplicaDirRequest is offline or if the ChangeReplicaDirResponse contains any error that is not ReplicaNotAvailableException.
- The script creates reassignment znode in zookeeper.
- The script retries ChangeReplicaDirRequest to those brokers which have responded with ReplicaNotAvailableException in the ChangeReplicaDirResponse previously. The script keeps retrying up to user-specified timeout. The timeout is 10 seconds by default. The script exits with error if the broker to receive ChangeReplicaDirRequest is offline or if the ChangeReplicaDirResponse contains any error that is not ReplicaNotAvailableException.
- The script returns result to user.
...
Code Block |
---|
ChangeReplicaDirResponse => topics topics => [ChangeReplicaDirResponseTopic] ChangeReplicaDirResponseTopic => topic partitions topic => str partitions => [ChangeReplicaDirResponsePartition] ChangeReplicaDirResponsePartition => partition error_code partition => int32 error_code => int16 |
Create DescribeDirsRequest
Code Block |
---|
// log_dirs and topics are used to filter the results to include only the specified log_dir/topic. The result is the intersection of both filters. DescribeDirsRequest => log_dirs topics log_dirs => [str] // If this is null, then all log directories will be queried topics => [str] // If this is null, all topics will be queried |
Create DescribeDirsResponse
Code Block |
---|
DescribeDirsResponse => log_dirs log_dirs => [DescribeDirsResponseDirMetadata] DescribeDirsResponseDirMetadata => error_code path topics error_code => int16 path => str topics => [DescribeDirsResponseTopic] DescribeDirsResponseTopic => topic partitions topic => str partitions => [DescribeDirsResponsePartition] DescribeDirsResponsePartition => partition size log_end_offset is_temporary partition => int32 size => int64 log_end_offset => int64 is_temporary => boolean // True if replica is *.move // Enable user to track movement progress by comparing LEO of the *.log and *.move is_temporary => boolean // True if replica is *.move |
Broker Config
1) Add config intra.broker.throttled.rate
. This config specified the maximum rate in bytes-per-second that can be used to move replica between log directories. This config defaults to MAX_LONG. The intra.broker.throttled.rate is per-broker and the specified capacity is shared by all replica-movement-threads.
2) Add config num.replica.move.threads
. This config specified the numbe number of threads in ReplicaMoveThreadPool
. The thread in this thread pool is responsible to moving replica between log directories. This config defaults to the number of log directories. Note that we typically expect 1-1 mapping between log directories and disks. Thus setting the config to number of log directories by default provides a reasonable way to keep the movement capacity in proportion with the number of log directoriesdisks.
Scripts
1) Add kafka-log-dirs.sh which allows user to get list of replicas per log directory on a broker.
...
Code Block |
---|
{ "version" : 1, "log_dirs" : [ { "is_live" : boolean, "path" : str, "partitions": [ { "topictopic" : str, "partition" : strint32, "partitionsize" : int32int64, "sizelog_end_offset" : int64in64, "is_temporary" : boolean }, ... ] }, ... ] } |
Code Block |
---|
{
"version" : int,
"partitions" : [
{
"topic" : str,
"partition" : int,
"replicas" : [int],
"log_dirs" : [str] <-- NEW. A log directory can be either "any", or a valid absolute path that begins with '/'. This is an optional filed. It is treated as an array of "any" if this field is not explicitly specified in the json file.
},
...
]
} |
Note: the quota specified by the argument `–throttle` will be applied to only inter-broker replica reassignment. It does not affect the quota for replica movement between log directories.
...