Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

- User specifies a list of log directories, one log directory per replica, for each topic partition in the reassignment json file that is provided to kafka-reassignment-partitions.shThe log directory specified by user must be either "any", or absolute path which begins with '/'. If "any" is specified as the log directory, the broker is free to choose any log directory to place the replica. Current broker implementation will select log directory using round-robin algorithm by default. See Scripts section for the format of this json file.
- The script sends ChangeReplicaDirRequest to those brokers which need to move replicas to user-specified log directory. This step can be skipped if user has specified "any" as log directory for all replicas. The script exits with error if the broker to receive ChangeReplicaDirRequest is offline or if the ChangeReplicaDirResponse contains any error that is not ReplicaNotAvailableException.
- The script creates reassignment znode in zookeeper.
- The script retries ChangeReplicaDirRequest to those brokers which have responded with ReplicaNotAvailableException in the ChangeReplicaDirResponse previously. The script keeps retrying up to user-specified timeout. The timeout is 10 seconds by default. The script exits with error if the broker to receive ChangeReplicaDirRequest is offline or if the ChangeReplicaDirResponse contains any error that is not ReplicaNotAvailableException.
- The script returns result to user.

...

 

Code Block
ChangeReplicaDirResponse => topics
  topics => [ChangeReplicaDirResponseTopic]
 
ChangeReplicaDirResponseTopic => topic partitions
  topic => str
  partitions => [ChangeReplicaDirResponsePartition]
 
ChangeReplicaDirResponsePartition => partition error_code
  partition => int32
  error_code => int16

Create DescribeDirsRequest

Code Block
// log_dirs and topics are used to filter the results to include only the specified log_dir/topic. The result is the intersection of both filters.
DescribeDirsRequest => log_dirs topics
  log_dirs => [str]  // If this is null, then all log directories will be queried
  topics => [str] // If this is null, all topics will be queried

Create DescribeDirsResponse

Code Block
DescribeDirsResponse => log_dirs
  log_dirs => [DescribeDirsResponseDirMetadata]

DescribeDirsResponseDirMetadata => error_code path topics
  error_code => int16
  path => str
  topics => [DescribeDirsResponseTopic]
 
DescribeDirsResponseTopic => topic partitions
  topic => str
  partitions => [DescribeDirsResponsePartition]
  
DescribeDirsResponsePartition => partition size log_end_offset is_temporary
  partition => int32
  size => int64
  log_end_offset => int64
  is_temporary => boolean  // True if replica is *.move  // Enable user to track movement progress by comparing LEO of the *.log and *.move 
  is_temporary => boolean  // True if replica is *.move

Broker Config

1) Add config intra.broker.throttled.rate. This config specified the maximum rate in bytes-per-second that can be used to move replica between log directories. This config defaults to MAX_LONG. The intra.broker.throttled.rate is per-broker and the specified capacity is shared by all replica-movement-threads.

2) Add config num.replica.move.threads. This config specified the numbe number of threads in ReplicaMoveThreadPool. The thread in this thread pool is responsible to moving replica between log directories. This config defaults to the number of log directories. Note that we typically expect 1-1 mapping between log directories and disks. Thus setting the config to number of log directories by default provides a reasonable way to keep the movement capacity in proportion with the number of log directoriesdisks.

Scripts

1) Add kafka-log-dirs.sh which allows user to get list of replicas per log directory on a broker.

...

./bin/kafka-log-dirs.sh --describe --zookeeper localhost:2181 --broker 1 --log-dirs dir1,dir2,dir3 --topics topic1,topic2 will show list of partitions and their size per log directory for the specified topics and the specified log directories on the broker. If no log directory is specified by the user, then all log directories will be queried. If no topic is specified, then all topics will be queried. If a log directory is offline, then its error code in the DescribeDirsResponse will indicate the error and the log directory will be marked offline in the script output.

Note that the script can be used to check whether there is ongoing replica movement between log directories by looking for partition with is_temporary = True. User can also track movement progress by comparing LEO of the *.log and *.move 
The script output would have the following json format.

 

Code Block
{
  "version" : 1,
  "log_dirs" : [
    {
      "is_live" : boolean,
      "path" : str,
      "partitions": [
        {
          "topictopic" : str, 
          "partition" : strint32, 
          "partitionsize" : int32int64, 
          "sizelog_end_offset" : int64in64,
          "is_temporary" : boolean
        },
        ...
      ]
    },

    ...
  ]
}
 



2) Change kafka-reassignment-partitions.sh to allow user to specify the log directory that the replica should be moved to. This is provided via the reassignment json file with the following new format:
Code Block
{
  "version" : int,
  "partitions" : [
    {
      "topic" : str,
      "partition" : int,
      "replicas" : [int],
      "log_dirs" : [str]    <-- NEW. A log directory can be either "any", or a valid absolute path that begins with '/'. This is an optional filed. It is treated as an array of "any" if this field is not explicitly specified in the json file.
    },
    ...
  ]
}


Note:
the quota specified by the argument `–throttle` will be applied to only inter-broker replica reassignment. It does not affect the quota for replica movement between log directories.

...