Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

1. Initiate replica movement using ChangeReplicaDirRequest

User uses kafka-reassignemntreassignment-partitions.sh to send ChangeReplicaDirRequest to broker to initiate replica movement between its log directories. The flow graph below illustrates how broker handles ChangeReplicaDirRequest.

...

- User specifies a list of log directories, one log directory per replica, for each topic partition in the reassignment json file that is provided to kafka-reassignemntreassignment-partitions.shThe log directory specified by user must be either "any", or absolute path which begins with '/'. See Scripts section for the format of this json file.
- In addition to creating znode at /admin/reassign_partitions with the replica assignment, the script will also send ChangeReplicaDirRequest to the brokers of those replicas whose log directory path in the assignment is not "any". The script needs to retry up to a configured amount of time if ChangeReplicaDirResponse shows ReplicaNotAvailableException. This is needed to wait for controller to send LeaderAndIsrRequest to the broker to create the replica if the broker isn't already a leader or follower of the partition.
- Broker handles ChangeReplicaDirRequest as specified in the section "How to move replica between log directories on the same broker".

Here are the steps to verify partition assignment:

kafka-reassignemntreassignment-partitions.sh will verify partition assignment across brokers as it does now. 
- For those replicas with destination log directory != "any", kafka-reassignemntreassignment-partitions.sh groups those replicas according to their brokers and and sends DescribeDirsRequest to those brokers. The DescribeDirsRequest should provide the log directories and partitions specified in the expected assignment.
- Broker replies with DescribeDirsResponse which shows the current log directory for each partition specified in the DescribeDirsRequest.
- kafka-reassignemntreassignment-partitions.sh determines whether the replica has been moved to the specified log directory based on the DescribeDirsResponse.

...

 

Code Block
ChangeReplicaDirResponse => topics
  topics => [ChangeReplicaDirResponseTopic]
 
ChangeReplicaDirResponseTopic => topic partitions
  topic => str
  partitions => [ChangeReplicaDirResponsePartition]
 
ChangeReplicaDirResponsePartition => partition error_code
  partition => int32
  error_code => int16

Create DescribeDirsRequest

Code Block
// log_dirs and topics are used to filter the results to include only the specified log_dir/topic. The result is the intersection of both filters.
DescribeDirsRequest => log_dirs topics
  log_dirs => [str]  // If this is null, then all log directories will be queried
  topics => [str] // If this is null, all topics will be queried

Create DescribeDirsResponse

Code Block
DescribeDirsResponse => log_dirs
  log_dirs => [DescribeDirsResponseDirMetadata]

DescribeDirsResponseDirMetadata => error_code path topics
  error_code => int16
  path => str
  topics => [DescribeDirsResponseTopic]
 
DescribeDirsResponseTopic => topic partitions
  topic => str
  partitions => [DescribeDirsResponsePartition]
  
DescribeDirsResponsePartition => partition size log_end_offset is_temporary
  partition => int32
  size => int64
  log_end_offset => int64
  is_temporary => boolean  // True if replica is *.move

Broker Config

1) Add config intra.broker.throttled.rate. This config specified the maximum rate in bytes-per-second that can be used to move replica between log directories. This config defaults to MAX_LONG.
2) Add config num.replica.move.threads. This config specified the numbe of threads in ReplicaMoveThreadPool. The thread in this thread pool is responsible to moving replica between log directories. This config defaults to the number of log directories.

...

 

Code Block
{
  "version" : 1,
  "log_dirs" : [
    {
      "is_live" : boolean,
      "path" : str,
      "partitions": [
        {
          "topic" : str, 
          "partition" : int32, 
          "size" : int64,
          "is_temporary" : boolean
        },
        ...
      ]
    },

    ...
  ]
}

 

2) Change kafka-reassignemntreassignment-partitions.sh to allow user to specify the log directory that the replica should be moved to. This is provided via the reassignment json file with the following new format:
Code Block
{
  "version" : int,
  "partitions" : [
    {
      "topic" : str,
      "partition" : int,
      "replicas" : [int],
      "log_dirs" : [str]    <-- NEW. A log directory can be either "any", or a valid absolute path that begins with '/'
    },
    ...
  ]
}


Note: the quota specified by the argument `–throttle` will be applied to only inter-broker replica reassignment. It does not affect the quota for replica movement between log directories.

 

3) Add optional argument --timeout to kafka-reassignemntreassignment-partitions.sh. This is because kafka-reassignemntreassignment-partitions.sh may need to re-send ChangeReplicaDirRequest to broker if replica hasn't already been created there. The timeout is set to 10 seconds by default.

...