Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The idea is that user can send a ChangeReplicaDirRequest which tells broker to move topicPartition.log to a specified destination log directory. Broker can create a new directory with .move postfix on the destination log directory to hold all log segments of the replica with . move postfix. This allows broker to tell new log segments of the replica on the destination log directory from existing log segments of the replica in case of fail-over during replica movementon the source log directory during broker startup. The broker can create new log of segments for the replica on the destination log directory, push data from existing source log of the replica to the new destination log, and replace existing source log with the new destination log for this replica once the new log has caught up.

1. The thread model and broker logic for moving replica data between log directories

When a broker starts, for each topicPartition the broker will check if topicPartition.log and topicPartition.move exists on the disk. If topicPartition.move exists but topicPartition.log doesn't, the broker will rename topicPartition.move to topicPartition.log and appends data to this log. If both topicPartition.log and topicPartition.move exists. The broker will begin to push data from topicPartition.log to topicPartition.move using ReplicaFetcherThread. The topicPartition.log can be replaced by topicPartition.move when topicPartition.move has caught up. In the following we describe how leader and follower move replicas inside the broker.

Broker does the following in order to move a leader replica:

In the following we describe each step of the replica movement.

 

1. Initiate replica movement using ChangeReplicaDirRequest

Either user or controller can send ChangeReplicaDirRequest to broker to initiate replica movement between its log directories. The flow graph below illustrates how broker handles ChangeReplicaDirRequest.

 

View file
nameJBOD-flowgraph.pdf
height250

 


Broker will put ChangeReplicaDirRequest in a DelayedOperationPurgatory. The ChangeReplicaDirRequest can be completed when results for all partitions specified in the ChangeReplicaDirRequest are available. The result of a partition is determined using the following logic:

 

  • If source or destination disk fails, the result of this partition will be KafkaStorageException
  • If destination replica has caught up with source replica and has replaced source replica, the result of this partition has no error, which means success.

2. Copy replica data from source log directory to destination log directory

Here we describe how a broker moves data from source log directory to destination log directory.

Case 1: broker is moving a leader replica of topicParition

 - Java class Replica will keep track of two instances of Java class Log, one representing topicPartition.log and referencing log directory topicPartition and the other representing  referencing log directory topicPartition.move.
- Broker creates starts a new ReplicaFetcherThread to move replicas between its own disksdata from topicPartition on the source log directory to topicParition.move on the destination log directory. This ReplicaFetcherThread does not fetch data from other brokers.
- The ReplicaFetcherThread will repeatedly read reads ByteBufferMessageSet of size replica.fetch.max.bytes from those partitions it needs to move from source disks topicPartition on the source log directory and appends data to destination disks. Note that we will enforce replication quota mechanism the topicParition.move on the destination log directory as long as the rate doesn't exceeded the user-specified replication quota as introduced in KIP-73 to exclude those partitions that exceed the user-specified replication quota. See below for more detail regarding quota enforcement. If there is no data to move due to quota enforcement, the ReplicaFetcherThread will block until it has work to do.
- The ReplicaFetcherThread will replace topicPartition.log with topicPartition.move once topicPartition.move has caught up with topicPartition.log. It will delete topicParititon.log on the source disk, rename topicPartition.move to topicParititon.log on the destination disk, and update Replica instance of this topicPartition to only include a Log instance that points to the topicPartition.log in the destination disk. The data from leader will be appended to the new topicPartition.log going forward. Note that proper lock is needed to prevent KafkaRequestHandler from appending data to the topicPartition.log on the source disks before this replacement is completed by ReplicaFetcherThread.

 Broker does the following in order to move a follower replica:

...

, the topicParition will not be included in the ByteBufferMessageSet.
- If the ReplicaFetcherThread is moving multiple replicas between log directories, it will choose partitions in alphabetical order when selecting partitions to move. This helps us reduce among of double writes during the period that the replica is being moved and thus improves performance.

Case 2: broker is moving a follower replica of topicParition

- Java class Replica will keep track of two instances of Java class Log, one referencing log directory topicPartition and the other referencing log directory topicPartition.move.
- Broker starts a ReplicaFetcherThread to move data from topicPartition on the source log directory to topicParition.move on the destination log directory. This is the same ReplicaFetcherThread that is fetching data from leader broker of topicPartition. 
The maximum wait time of FetchRequset will be set to 0 ms if the ReplicaFetchThread needs to move any partition from topicPartition.log to topicPartition.move.
- The ReplicaFetcherThread builds FetchRequest with maximum wait time = 0 because this ReplicaFetcherThread needs to move data between log directories.
- The ReplicaFetcherThread sends FetchRequest to the leader broker of the topicPartition
- The ReplicaFetcherThread receives FetchResponse and appends data from FetchReponse to local disks.
- The ReplicaFetcherThread reads one or more ByteBufferMessageSet from topicPartition on the source log directory. Each ByteBufferMessageSet has size replica.fetch.max.bytes, and the total size of ByteBufferMessageSet read in this step should

...

be limited by replica.fetch.response.max.bytes

...

AND the user-specified replication quota

...

as introduced in KIP-73

...

.
- If the ReplicaFetcherThread is moving multiple replicas between log directories, it will choose partitions in alphabetical order when selecting partitions to move. This helps us reduce among of double writes during the period that the replica is being moved and thus improves performance.

Notes:
- The replica movement will stop if either source or destination replica becomes offline due to disk failure.
- We use the same mechanism introduced in KIP-73 to throttle the rate of replica movement between disks on the same broker. User will need to configure leader.replication.throttled.replicas, follower.replication.throttled.replicas, leader.replication.throttled.rate and follower.replication.throttled.rate in the same way as specified in KIP-73, i.e. through kafka-reassign-partitions.sh or kafka-config.sh. For every message that is moved from source disk to destination disk, the size of the message will be subtracted from both leader replication quota and follower replication quota if its partition is included in the throttled replicas list. No data will be moved for a partition in the *.replication.throttled.replicas if either leader replication quota or follower replication quota is exceed. 


3. Replacing replica in the source log directory with replica in the destination log directory

Case 1: broker is moving a leader replica of topicParition

- The ReplicaFetcherThread discovers that topicPartition.move on the destination log directory has caught up with topicPartition on the source log directory after it pushes a ByteBufferMessageSet to topicPartition.move.
- The ReplicaFetcherThread attempts to get a lock to prevent KafkaRequestHandler thread from appending data to the topicParition.
- The ReplicaFetcherThread renames directory topicPartition to topicPartition.delete on the source log directory. topicParition.delete will be subject to asynchronous delete.
- The ReplicaFetcherThread renames directory topicParition.move to topicParition on the destination log directory.
- The ReplicaFetcherThread changes the Replica instance of this topicPartition to reference only the directory topicParition on the destination log directory.
- The ReplicaFetcherThread releases the lock so that KafkaRequestHandler thread can continue to append data to topicParition.
- The data from ProduceRequest will be appended to the topicPartition on the destination log directory in the future.
- FetchRequest will get data from topicParition on the destination log directory in the future.

 

Case 2: broker is moving a follower replica of topicParition
- The ReplicaFetcherThread discovers that topicPartition.move on the destination log directory has caught up with topicPartition on the source log directory after it pushes a ByteBufferMessageSet to topicPartition.move.

- The ReplicaFetcherThread renames directory topicPartition to topicPartition.delete on the source log directory. topicParition.delete will be subject to asynchronous delete.
- The ReplicaFetcherThread renames directory topicParition.move to topicParition on the destination log directory.
- The ReplicaFetcherThread changes the Replica instance of this topicPartition to reference only the directory topicParition on the destination log directory.
- The ReplicaFetcherThread will append data from the leader of topicParition to the directory topicPartition on the destination log directory in the future.

Notes:
- When swapping a leader replica after the replica in the destination disk has caught up, proper lock is needed to prevent KafkaRequestHandler from appending data to the topicPartition.log on the source disks while ReplicaFetcherThread is swapping the replica.
- When swapping a follower replica after the replica in the destination disk has caught up, no lock is needed to swap the replica because the same ReplicaFetchThread will do the replacement and fetch data from leader.


4. Handle failure that happens broker is moving data or swapping replica

Broker does the following to recover from failure when it starts up.

- If both the directory topicPartition and the directory topicPartition.move exist on good log directories, broker will start ReplicaFetcherThread to copy data from topicPartition to topicPartition.move. The effect is the same as if broker has received ChangeReplicaDirRequest to move replica from topicPartition to topicPartition.move.
- If topicPartition.move exists but topicPartition doesn't exist on any good log directory, and if there is no bad log directory, then broker renames topicPartition.move to topicPartition.
- If topicPartition.move exists but topicPartition doesn't exist on any good log directory, and if there is bad log directory, then broker considers topicPartition as offline and would not touch topicPartition.move.
- If topicPartition.delete exists, the broker schedules topicParition.delete for asynchronous delete.

2) How to reassign replica between log directories across brokers

Problem statement:

kafka-reassign-partitions.sh should provide the option for user to specify destination log directory of the replica on any broker. And user should be sure that the replica has been moved to the specific log directory after the reassignment is completed. This is needed in order for user to balance load across log directories of brokers in the cluster.

Solution:

The idea is that user should be able to specify log directory when using kafka-reassign-partitions.sh to reassign partition. Controller should be able to read this optional log directory info when reading assignment from zookeeper. Controller should be able to send ChangeReplicaDirRequest and wait for ChangeReplicaDirResponse to confirm the movement to the specific log directory before declaring that this partition has been moved. We describe the procedure in more detail below:

- User specifies a list of log directories, one log directory per replica, for each topic partition in the reassignment json file that is provided to kafka-reassignemnt-partitions.shThe log directory specified by user must be either "any", or absolute path which begins with '/'. See Scripts section for the format of this json file.

- kafka-reassignment-partitions.sh writes log directories obtained from the reassignment json file to the znode /admin/reassign_partitions.  If user doesn't specify log directory, "any" will be used as the default log directory name. See Zookeeper section for the format of the data in the znode.

- Controller updates state machine, sends LeaderAndIsrRequest and so on to perform partition reassignment. In addition, it also sends ChangeReplicaDirRequest for all replicas that are specified with log directory != "any". The ChangeReplicaDirRequest will move the replica to a specific log directory if it is not already placed there on the broker.

- In addition to the existing requirement of partition reassignment completion, controller will also wait for ChangeReplicaDirResponse (corresponding to the ChangeReplicaDirRequest it has sent) before it considers a movement to be completed and removes a partition from /admin/reassign_partitions. This allows user to confirm that the reassignment to specific disks of brokers is completed after the partition is removed from znode data of /admin/reassign_partitions.

3) How to retrieve information to determine the new replica assignment across log directories

Problem statement:

In order to optimize replica assignment across log directories, user would need to figure out the list partitions per log directory, the size of each partition. As of now Kafka doesn't expose this information via any RPC and user would need to use external tools to directly exam the log directories on each machine to get this information. It is better if Kafka can expose this information via RPC.

Solution:

We introduce DescribeDirsRequest and DescribeDirsResponse. When a broker receives DescribeDirsRequest with empty list of log directories, it will respond with a DescribeDirsResponse which shows the size of each partition and lists of partitions per log directory for all log directories. If user has specified a list of log directories in the DescribeDirsRequest, the broker will provide the above information for only log directories specified by the user. Non-zero error code will specified in the DescribeDirsResponse for each log directory that is either offline or not found by the broker.


User can use command such as ./bin/kafka-log-dirs.sh --describe --zookeeper localhost:2181 --broker 1 to get the above information per log directory.

Public interface

Zookeeper

Change the format of data stored in znode /admin/reassign_partitions to allow log directory to be specified for each replica.

Code Block
{
  "version" : int,
  "all_log_dirs": [str] <-- NEW. This is a list of unique strings representing log directory paths. "any" will be included as the first element of this list.

In order to optimize the efficiency of replica movement between disks, ReplicaFetchThread will always choose partitions in alphabetical order when selecting partitions to move. It means that it will allocate all resource (e.g. quota) to move one partition and complete its movement before moving another partition. This can help us reduce among of double writes during the period that the replica is being moved.

 

2. The broker logic for handling ChangeReplicaDirRequest

The flow graph below illustrates how broker handles ChangeReplicaDirRequest.

View file
nameJBOD-flowgraph.pdf
height250

Note that the broker will put ChangeReplicaRequest in a DelayedOperationPurgatory. The ChangeReplicaRequest can be completed when results for all partitions specified in the ChangeReplicaRequest are available. The result of a partition is determined using the following logic:

  • If source or destination disk fails, the result of this partition will be KafkaStorageException
  • If destination replica has caught up with source replica and has replaced source replica, the result of this partition has no error.

3. Throttle replica movement rate

We use the same mechanism introduced in KIP-73 to throttle the rate of replica movement between disks on the same broker. User will need to configure leader.replication.throttled.replicas, follower.replication.throttled.replicas, leader.replication.throttled.rate and follower.replication.throttled.rate in the same way as specified in KIP-73, i.e. through kafka-reassign-partitions.sh or kafka-config.sh. For every message that is moved from source disk to destination disk, the size of the message will be subtracted from both leader replication quota and follower replication quota if its partition is included in the throttled replicas list. No data will be moved for a partition in the *.replication.throttled.replicas if either leader replication quota or follower replication quota is exceed.

 

4. Query broker for partition assignment and partition size per log directory

When a broker receives DescribeDirsRequest with empty list of log directories, it will respond with a DescribeDirsResponse which shows the size of each partition and lists of partitions per log directory for all log directories. If user has specified a list of log directories in the DescribeDirsRequest, the broker will provide the above information for only log directories specified by the user. Non-zero error code will specified in the DescribeDirsResponse for each log directory that is either offline or not found by the broker.

User can use command such as ./bin/kafka-log-dirs.sh --describe --zookeeper localhost:2181 --broker 1 to get the above information per log directory.

2) How to reassign replica between log directories across brokers

Problem statement:

kafka-reassign-partitions.sh should provide the option for user to specify destination log directory of the replica on any broker. And user should be sure that the replica has been moved to the specific log directory after the reassignment is completed. This is needed in order for user to balance load across log directories of brokers in the cluster.

Solution:

The idea is that user should be able to specify log directory when using kafka-reassign-partitions.sh to reassign partition. Controller should be able to read this optional log directory info when reading assignment from zookeeper. Controller should be able to send ChangeReplicaDirRequest and wait for ChangeReplicaDirResponse to confirm the movement to the specific log directory before declaring that this partition has been moved. We describe the procedure in more detail below:

- User can specify a list of log directories, one log directory per replica, for each topic partition in the reassignment json file that is provided to kafka-reassignemnt-partitions.shThe log directory specified by user must be either "any", or absolute path which begins with '/'. See Scripts section for the format of this json file.

- kafka-reassignment-partitions.sh will write log directories read from the reassignment json file to the znode /admin/reassign_partitions.  If user doesn't specify log directory, "any" will be used as the default log directory name. See Zookeeper section for the format of the data in the znode.

- Controller will still update state machine, send LeaderAndIsrRequest and so on to perform partition reassignment. However, it will additionally send ChangeReplicaDirRequest for all replicas that are specified with log directory != "any". The ChangeReplicaDirRequest will move the replica to a specific log directory if it is not already placed there on the broker.

- In addition to the existing requirement of partition reassignment completion, controller will also wait for ChangeReplicaResponse (corresponding to the ChangeReplicaDirRequest it has sent) before it considers a movement to be completed and removes a partition from /admin/reassign_partitions. This allows user to confirm that the reassignment to specific disks of brokers is completed after the partition is removed from znode data of /admin/reassign_partitions.

Public interface

Zookeeper

Change the format of data stored in znode /admin/reassign_partitions to allow log directory to be specified for each replica.

Code Block
{
  "version" : int,
  "partitions" : [
    {
      "topic" : str,
      "partition" : int,
      "replicas" : [int],
      "log_dirs" : [str]    <-- NEWdirs" : [int]    <-- NEW. This is a list of indexes of log directory paths in the "all_log_dirs". Thus we can translate this list of indexes into the list of log directory paths. If log directory is not explicitly specified by user, "any" will be used as log directory name and broker will select log directory using its own policy. Currently the log directory is selected in a round-robin manner.
    },
    ...
  ]
}

...

./bin/kafka-log-dirs.sh --describe --zookeeper localhost:2181 --broker 1 --log-dirs dir1,dir2,dir3 will show list of partitions and their size per log directory for the specified log directories on the broker. If no log directory is specified by the user, then all log directories will be queried. If a partition is created on the broker but not found on any live log directory is offline, then its log directory will be named "*" error code in the DescribeDirsResponse will indicate error and the log directory is marked offline in the script output and the liveness of this log directory will be marked as False. .

The script output would have the following json format.

 

Code Block
{
  "version" : 1,
  "log_dirs" : [
    {
      "is_live" : boolean,
      "path" : str,
      "partitions": [
        {
          "topic" : str, 
          "partition" : int32, 
          "size" : int64
        },
        ...
      ]
    },

    ...
  ]
}

 

...