Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When a broker starts, for each topicPartition the broker will check if topicPartition.log and topicPartition.move exists on the disk. If topicPartition.move exists but topicPartition.log doesn't, the broker will rename topicPartition.move to topicPartition.log and appends data to this log. If both topicPartition.log and topicPartition.move exists. The broker will begin to push data from topicPartition.log to topicPartition.move using ReplicaFetcherThread. The topicPartition.log can be replaced by topicPartition.move when topicPartition.move has caught up. Here is how it works in detail:In the following we describe how leader and follower move replicas inside the broker.

Broker does the following in order to move a leader replica:

 - Java class Replica will keep track of two instances of Java class Log, one representing topicPartition.log and the other representing topicPartition.move
- ReplicaFetchThread will push data from topicPartition.log to topicPartition.move. If the Replica is a follower, then the thread will be the same ReplicaFetchThread which is appending data to topicPartition.log. If the Replica is a leader, ReplicaFetcherManager will create a new ReplicaFetchThread to move replicas between its own disks. We will need quota to rate limit replica movement if the replica is a leader. 
- When ReplicaFetchThread builds FetchRequset, it sets maximum wait time to 0 ms if it needs to move any replica between its own disks.
- When ReplicaFetchThread executes processPartitionData(topicPartition) after receiving FetchResponse, it will check if the local replica of this topicPartition has Log instances for both topicPartition.log and topicPartition.move. If yes, it will read one ByteBufferMessageSet of size replica.fetch.max.bytes from topicPartition.log and append the message set to topicPartition.move.
- Replace topicPartition.log with topicPartition.move when topicPartition.move has caught up.
  - If the Replica is a follower, and if topicPartition.move has caught up with topicPartition.log, the ReplicaFetchThread which is appending data to this Replica will rename topicPartition.log to topicPartition.delete, rename topicPartition.move to topicPartition.log, and update the Replica instance of this topicPartition to only include a Log instance that points to the new topicPartition.log. The data from leader will be appended to the new log file in the future.
  - If the Replica is a leader, and if topicPartition.move has caught up with topicPartition.log,  the KafkaRequestHandler thread which is handling a ProduceRequest to this Replica will rename topicPartition.log to topicPartition.delete, rename topicPartition.move to topicPartition.log, update the Replica instance of this topicPartition to only include a Log instance that points to the new topicPartition.log, and append data from ProduceRequest to the new log file.

2. The broker logic for handling ChangeReplicaDirRequest

The flow graph below illustrates how broker handles ChangeReplicaDirRequest

View file
nameJBOD-flowgraph.pdf
height250

3. Query broker for partition assignment and partition size per log directory

When a broker receives DescribeDirsRequest with empty list of log directories, it will respond with a DescribeDirsResponse which shows the size of each partition and lists of partitions per log directory for all log directories. If user has specified a list of log directories in the DescribeDirsRequest, the broker will provide the above information for only log directories specified by the user. Non-zero error code will specified in the DescribeDirsResponse for each log directory that is either offline or not found by the broker.

User can use command such as ./bin/kafka-log-dirs.sh --describe --zookeeper localhost:2181 --broker 1 to get the above information per log directory.

2) How to reassign replica to a specific log directory on any broker

Problem statement:

kafka-reassign-partitions.sh should provide the option for user to specify destination log directory of the replica on any broker. This is needed in order for user to balance load across log directories of brokers in the cluster.

Solution:

- User can specify a list of log directories, one log directory per replica, for each topic partition in the reassignment json file that is provided to kafka-reassignemnt-partitions.shThe log directory specified by user must be either "any", or absolute path which begins with '/'. See Scripts section for the format of this json file.

- kafka-reassignment-partitions.sh will write log directories read from the reassignment json file to the znode /admin/reassign_partitions.  If user doesn't specify log directory, "any" will be used as the default log directory name. See Zookeeper section for the format of the data in the znode.

- Controller will still update state machine, send LeaderAndIsrRequest and so on to perform partition reassignment. However, it will additionally send ChangeReplicaDirRequest for all replicas that are specified with log directory != "any". The ChangeReplicaDirRequest will move the replica to a specific log directory if it is not already placed there on the broker.

Public interface

Zookeeper

Change the format of data stored in znode /admin/reassign_partitions to allow log directory to be specified for each replica.

Code Block
{
  "version" : int,
  "partitions" : [
    {
      "topic" : str,
      "partition" : int,
      "replicas" : [int],
      "log_dirs" : [str]    <-- NEW. If log directory is not explicitly specified by user, "any" will be used as log directory name and broker will select log directory using its own policy. Currently the log directory is selected in a round-robin manner.
    },
    ...
  ]
}

Protocol

Create ChangeReplicaDirRequest

 

Code Block
ChangeReplicaDirRequest => [ReplicaState]

ReplicaState =>
  topic => str
  partition => int32
  dir => str

 

Create ChangeReplicaDirResponse

 

Code Block
ChangeReplicaDirResponse => error_code partitions
  error_code => int16
  partitions => [ChangeReplicaDirResponsePartition]
 
ChangeReplicaDirResponsePartition => topic partition error_code
  topic => str
  partition => int32
  error_code => int16

Create DescribeDirsRequest

Code Block
DescribeDirsRequest => log_dirs
  log_dirs => [str]

Create DescribeDirsResponse

Code Block
DescribeDirsResponse => log_dirs
  log_dirs => [DescribeDirsResponseDirMetadata]
 
DescribeDirsResponseDirMetadata => error_code path topics
  error_code => int16
  path => str
  topics => [DescribeDirsResponseTopic]
 
DescribeDirsResponseTopic => topic partitions
  topic => str
  partitions => [DescribeDirsResponsePartition]
  
DescribeDirsResponsePartition => partition size
  partition => int32
  size => int64

 

Metrics

Here are the metrics we need to add as part of this proposal

...

Scripts

1) Add kafka-log-dirs.sh which allows user to get list of replicas per log directory on a broker or move replicas between log directories on a broker.

./bin/kafka-log-dirs.sh --describe --zookeeper localhost:2181 --broker 1 --log-dirs dir1,dir2,dir3 will show list of partitions and their size per log directory for the specified log directories on the broker. If no log directory is specified by the user, then all log directories will be queried. If a partition is created on the broker but not found on any live log directory, its log directory will be named "*" in the script output and the liveness of this log directory will be marked as False. The script output would have the following json format.

 

Code Block
{
  "version" : 1,
  "log_dirs" : [
    {
      "live" : boolean,
      "path" : str,
      "partitions": [
        {
          "topic" : str, 
          "partition" : int32, 
          "size" : int64
        },
        ...
      ]
    },

    ...
  ]
}

 

./bin/kafka-log-dirs.sh --move --zookeeper localhost:2181 --broker 1 --assignment jsonFile allows users to move replicas between log directories on the same broker.The jsonFile has the following json format:

...

Broker creates a new ReplicaFetcherThread to move replicas between its own disks.
- The ReplicaFetcherThread will repeatedly read ByteBufferMessageSet of size replica.fetch.max.bytes from those partitions it needs to move from source disks and appends data to destination disks. Note that we will enforce replication quota mechanism introduced in KIP-73 to exclude those partitions that exceed the user-specified replication quota. See below for more detail regarding quota enforcement. If there is no data to move due to quota enforcement, the ReplicaFetcherThread will block until it has work to do.
- The ReplicaFetcherThread will replace topicPartition.log with topicPartition.move once topicPartition.move has caught up with topicPartition.log. It will delete topicParititon.log on the source disk, rename topicPartition.move to topicParititon.log on the destination disk, and update Replica instance of this topicPartition to only include a Log instance that points to the topicPartition.log in the destination disk. The data from leader will be appended to the new topicPartition.log going forward. Note that proper lock is needed to prevent KafkaRequestHandler from appending data to the topicPartition.log on the source disks before this replacement is completed by ReplicaFetcherThread.

 Broker does the following in order to move a follower replica:

- Java class Replica will keep track of two instances of Java class Log, one representing topicPartition.log and the other representing topicPartition.move
- The ReplicaFetchThread which is appending data to topicPartition.log will push data from topicPartition.log to topicPartition.move. The maximum wait time of FetchRequset will be set to 0 ms if the ReplicaFetchThread needs to move any partition from topicPartition.log to topicPartition.move.
- After ReplicaFetchThread receives a FetchResponse, in addition to appending the data from FetchResponse to local disks, it also reads multiple ByteBufferMessageSet of size replica.fetch.max.bytes from partitions it needs to move from source disks and appends data to destination disks. The total size of ByteBufferMessageSet that can be moved in this step will be limited by replica.fetch.response.max.bytes
Note that we will enforce replication quota mechanism introduced in KIP-73 to exclude those partitions that exceed the user-specified replication quota. See below for more detail regarding quota enforcement.
- The ReplicaFetcherThread will replace topicPartition.log with topicPartition.move once topicPartition.move has caught up with topicPartition.log. It will delete topicParititon.log on the source disk, rename topicPartition.move to topicParititon.log on the destination disk, and update Replica instance of this topicPartition to only include a Log instance that points to the topicPartition.log in the destination disk. The data from leader will be appended to the new topicPartition.log going forward.

In order to optimize the efficiency of replica movement between disks, ReplicaFetchThread will always choose partitions in alphabetical order when selecting partitions to move. It means that it will allocate all resource (e.g. quota) to move one partition and complete its movement before moving another partition. This can help us reduce among of double writes during the period that the replica is being moved.

 

2. The broker logic for handling ChangeReplicaDirRequest

The flow graph below illustrates how broker handles ChangeReplicaDirRequest

View file
nameJBOD-flowgraph.pdf
height250


3. Throttle replica movement rate

We use the same mechanism introduced in KIP-73 to throttle the rate of replica movement between disks on the same broker. User will need to configure leader.replication.throttled.replicas, follower.replication.throttled.replicas, leader.replication.throttled.rate and follower.replication.throttled.rate in the same way as specified in KIP-73, i.e. through kafka-reassign-partitions.sh or kafka-config.sh. For every message that is moved from source disk to destination disk, the size of the message will be subtracted from both leader replication quota and follower replication quota if its partition is included in the throttled replicas list. No data will be moved for a partition in the *.replication.throttled.replicas if either leader replication quota or follower replication quota is exceed.

 

4. Query broker for partition assignment and partition size per log directory

When a broker receives DescribeDirsRequest with empty list of log directories, it will respond with a DescribeDirsResponse which shows the size of each partition and lists of partitions per log directory for all log directories. If user has specified a list of log directories in the DescribeDirsRequest, the broker will provide the above information for only log directories specified by the user. Non-zero error code will specified in the DescribeDirsResponse for each log directory that is either offline or not found by the broker.


User can use command such as ./bin/kafka-log-dirs.sh --describe --zookeeper localhost:2181 --broker 1 to get the above information per log directory.

2) How to reassign replica to a specific log directory on any broker

Problem statement:

kafka-reassign-partitions.sh should provide the option for user to specify destination log directory of the replica on any broker. This is needed in order for user to balance load across log directories of brokers in the cluster.

Solution:

- User can specify a list of log directories, one log directory per replica, for each topic partition in the reassignment json file that is provided to kafka-reassignemnt-partitions.shThe log directory specified by user must be either "any", or absolute path which begins with '/'. See Scripts section for the format of this json file.

- kafka-reassignment-partitions.sh will write log directories read from the reassignment json file to the znode /admin/reassign_partitions.  If user doesn't specify log directory, "any" will be used as the default log directory name. See Zookeeper section for the format of the data in the znode.

- Controller will still update state machine, send LeaderAndIsrRequest and so on to perform partition reassignment. However, it will additionally send ChangeReplicaDirRequest for all replicas that are specified with log directory != "any". The ChangeReplicaDirRequest will move the replica to a specific log directory if it is not already placed there on the broker.

Public interface

Zookeeper

Change the format of data stored in znode /admin/reassign_partitions to allow log directory to be specified for each replica.

Code Block
{
  "version" : int,
  "partitions" : [
    {
      "topic" : str,
      "partition" : int,
      "replicas" : [int],
      "log_dirs" : [str]    <-- NEW. If log directory is not explicitly specified by user, "any" will be used as log directory name and broker will select log directory using its own policy. Currently the log directory is selected in a round-robin manner.
    },
    ...
  ]
}

Protocol

Create ChangeReplicaDirRequest

 

Code Block
ChangeReplicaDirRequest => [ReplicaState]

ReplicaState =>
  topic => str
  partition => int32
  dir => str

 

Create ChangeReplicaDirResponse

 

Code Block
ChangeReplicaDirResponse => error_code partitions
  error_code => int16
  partitions => [ChangeReplicaDirResponsePartition]
 
ChangeReplicaDirResponsePartition => topic partition error_code
  topic => str
  partition => int32
  error_code => int16

Create DescribeDirsRequest

Code Block
DescribeDirsRequest => log_dirs
  log_dirs => [str]

Create DescribeDirsResponse

Code Block
DescribeDirsResponse => log_dirs
  log_dirs => [DescribeDirsResponseDirMetadata]
 
DescribeDirsResponseDirMetadata => error_code path topics
  error_code => int16
  path => str
  topics => [DescribeDirsResponseTopic]
 
DescribeDirsResponseTopic => topic partitions
  topic => str
  partitions => [DescribeDirsResponsePartition]
  
DescribeDirsResponsePartition => partition size
  partition => int32
  size => int64

 

Metrics

Here are the metrics we need to add as part of this proposal


OfflineReplicasCount: The number of offline replicas on a live broker. This is equivalent to the number of TopicParition log on the bad log directories of the broker.

Scripts

1) Add kafka-log-dirs.sh which allows user to get list of replicas per log directory on a broker.


./bin/kafka-log-dirs.sh --describe --zookeeper localhost:2181 --broker 1 --log-dirs dir1,dir2,dir3 will show list of partitions and their size per log directory for the specified log directories on the broker. If no log directory is specified by the user, then all log directories will be queried. If a partition is created on the broker but not found on any live log directory, its log directory will be named "*" in the script output and the liveness of this log directory will be marked as False. The script output would have the following json format.

 

Code Block
{
  "version" : 1,
  "log_dirs" : [
    {
      "live" : boolean,
      "path" : str,
      "partitions": [
        {
          "topic" : str, 
          "partition" : int32, 
          "

...

size" :

...

 int64
        },
        ...
      ]
    },

    ...
  ]
}

 

2) Change kafka-reassignemnt-partitions.sh to allow user to specify the log directory that the replica should be moved to. This is provided via the reassignment json file with the following new format:
Code Block
{
  "version" : int,
  "partitions" : [
    {
      "topic" : str,
      "partition" : int,
      "replicas" : [int],
      "log_dirs" : [str]    <-- NEW. A log directory can be either "any", or a valid absolute path that begins with '/'
    },
    ...
  ]
}

...

When running Kafka with RAID-10, we only need to take care of load imbalance across brokers. Administrator can balance load across brokers using the script kafka-reassign-partitions.sh. After switching from RAID-10 to JBOD, we will start to see load imbalance across log directories. In order to address this problem, administrator needs to get the partition assignment and their size per log directory using kafka-log-dirs.sh, determines the reassignment of replicas per log directory (as opposed to per broker), and provides partition -> log_directory mapping as input to either kafka-reassign-partitions.sh or kafka-log-dirs.sh to execute to execute the new assignment.

Administrator also needs to be prepared that the need to rebalance across log directories will probably be much more frequent than the need to rebalance across brokers since the capacity of individual disk is likely much smaller than the capacity of existing RAID-10 setup.

...