Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The idea is that user can send a ChangeReplicaDirRequest AlterReplicaDirRequest which tells broker to move topicPartition directory (which contains all log segments of the topicPartition replica) from the source log directory to a destination log directory. Broker can create a new directory with .move postfix on the destination log directory to hold all log segments of the replica. This allows broker to tell log segments of the replica on the destination log directory from log segments of the replica on the source log directory during broker startup. The broker can create new log segments for the replica on the destination log directory, push data from source log to the destination log, and replace source log with the destination log for this replica once the new log has caught up.

...

1. Initiate replica movement using ChangeReplicaDirRequestusing AlterReplicaDirRequest

User uses kafka-reassignment-partitions.sh to send ChangeReplicaDirRequest AlterReplicaDirRequest to broker to initiate replica movement between its log directories. The flow graph below illustrates how broker handles ChangeReplicaDirRequest AlterReplicaDirRequest.

Notes:
- Broker will cancel existing movement of the replica if "any" is specified as destination log directory.
- If broker doesn't not have already replica created for the specified topicParition when it receives ChangeReplicaDirRequestAlterReplicaDirRequest, it will reply ReplicaNotAvailableException AND remember (replica, destination log directory) pair in memory to create the replica in the specified log directory when it receives LeaderAndIsrRequest later.

...

Here we describe how a broker moves a Log from source to destination log directory and swaps the Log.  This corresponds to the "Initiate replica data movement" box in the flow graph above. Note that the broker responds to ChangeReplicaDirRequest AlterReplicaDirRequest with MoveInProgress after step 1) described below.

...

- If both the directory topicPartition and the directory topicPartition.move exist on good log directories, broker will start ReplicaMoveThread to copy data from topicPartition to topicPartition.move. The effect is the same as if broker has received ChangeReplicaDirRequest AlterReplicaDirRequest to move replica from topicPartition to topicPartition.move.
- If topicPartition.move exists but topicPartition doesn't exist on any good log directory, and if there is no bad log directory, then broker renames topicPartition.move to topicPartition.
- If topicPartition.move exists but topicPartition doesn't exist on any good log directory, and if there is bad log directory, then broker considers topicPartition as offline and would not touch topicPartition.move.
- If topicPartition.delete exists, the broker schedules topicParition.delete for asynchronous delete.

...

The idea is that user should be able to specify log directory when using kafka-reassign-partitions.sh to reassign partition. If user has specified log directory on the destination broker, the script should send ChangeReplicaDirRequest AlterReplicaDirRequest directly to the broker so that broker can start ReplicaMoveThread to move the replica. Finally, the script should send DescribeDirsRequest to broker to verify that the replica has been created/moved in the specified log directory when user requests to verify the assignment.

...

- User specifies a list of log directories, one log directory per replica, for each topic partition in the reassignment json file that is provided to kafka-reassignment-partitions.shThe log directory specified by user must be either "any", or absolute path which begins with '/'. If "any" is specified as the log directory, the broker is free to choose any log directory to place the replica. Current broker implementation will select log directory using round-robin algorithm by default. See Scripts section for the format of this json file.
- The script sends ChangeReplicaDirRequest AlterReplicaDirRequest to those brokers which need to move replicas to user-specified log directory. This step can be skipped if user has specified "any" as log directory for all replicas. The script exits with error if the broker to receive ChangeReplicaDirRequest AlterReplicaDirRequest is offline or if the ChangeReplicaDirResponse AlterReplicaDirResponse contains any error that is not ReplicaNotAvailableException.
- The script creates reassignment znode in zookeeper.
- The script retries ChangeReplicaDirRequest AlterReplicaDirRequest to those brokers which have responded with ReplicaNotAvailableException in the ChangeReplicaDirResponse  AlterReplicaDirResponse previously. The script keeps retrying up to user-specified timeout. The timeout is 10 seconds by default. The script exits with error if the broker to receive ChangeReplicaDirRequest AlterReplicaDirRequest is offline or if the ChangeReplicaDirResponse AlterReplicaDirResponse contains any error that is not ReplicaNotAvailableException.
- The script returns result to user.

...

Public interface

Protocol

Create ChangeReplicaDirRequestAlterReplicaDirRequest

 

Code Block
ChangeReplicaDirRequestAlterReplicaDirRequest => topics
  topics => [ChangeReplicaDirRequestTopicAlterReplicaDirRequestTopic]
 
ChangeReplicaDirRequestTopicAlterReplicaDirRequestTopic => topic partitions
  topic => str
  partitions => [ChangeReplicaDirRequestPartitionAlterReplicaDirRequestPartition]

ChangeReplicaDirRequestPartitionAlterReplicaDirRequestPartition => partition dir
  partition => int32
  dir => str

 

Create ChangeReplicaDirResponseAlterReplicaDirResponse

 

Code Block
ChangeReplicaDirResponseAlterReplicaDirResponse => topics
  topics => [ChangeReplicaDirResponseTopicAlterReplicaDirResponseTopic]
 
ChangeReplicaDirResponseTopicAlterReplicaDirResponseTopic => topic partitions
  topic => str
  partitions => [ChangeReplicaDirResponsePartitionAlterReplicaDirResponsePartition]
 
ChangeReplicaDirResponsePartitionAlterReplicaDirResponsePartition => partition error_code
  partition => int32
  error_code => int16

Create DescribeDirsRequest

Code Block
// log_dirs and topics are used to filter the results to include only the specified log_dir/topic. The result is the intersection of both filters.
DescribeDirsRequest => log_dirs topics
  log_dirs => [str]  // If this is empty, then all log directories will be queried
  topics => DescribeDirsRequestTopic // If this is empty, all topics will be queried
 
DescribeDirsRequestTopic => topic partitions
  topic => str
  partitions => [int32]

Create DescribeDirsResponse

Code Block
DescribeDirsResponse => log_dirs
  log_dirs => [DescribeDirsResponseDirMetadata]

DescribeDirsResponseDirMetadata => error_code path topics
  error_code => int16
  path => str
  topics => [DescribeDirsResponseTopic]
 
DescribeDirsResponseTopic => topic partitions
  topic => str
  partitions => [DescribeDirsResponsePartition]
  
DescribeDirsResponsePartition => partition size log_end_offset is_temporary
  partition => int32
  size => int64
  log_end_offset => int64  // Enable user to track movement progress by comparing LEO of the *.log and *.move 
  is_temporary => boolean  // True if replica is *.move

Broker Config

1) Add config intra.broker.throttled.rate. This config specified the maximum rate in bytes-per-second that can be used to move replica between log directories. This config defaults to MAX_LONG. The intra.broker.throttled.rate is per-broker and the specified capacity is shared by all replica-movement-threads.

2) Add config num.replica.move.threads. This config specified the number of threads in ReplicaMoveThreadPool. The thread in this thread pool is responsible to moving replica between log directories. This config defaults to the number of log directories. Note that we typically expect 1-1 mapping between log directories and disks. Thus setting the config to number of log directories by default provides a reasonable way to keep the movement capacity in proportion with the number of disks.

...

3) Add optional argument --timeout to kafka-reassignment-partitions.sh. This is because kafka-reassignment-partitions.sh may need to re-send ChangeReplicaDirRequest AlterReplicaDirRequest to broker if replica hasn't already been created there. The timeout is set to 10 seconds by default.

...

Code Block
languagejava
public interface AdminClient extends AutoCloseable {

    /**
     * ChangeAlter the log directory for the specified replicas.
     *
     * Updates are not transactional so they may succeed for some resources while fail for others. The log directory for
     * a particular replica is updated atomically.
     *
     * This operation is supported by brokers with version 0.11.1.0 or higher.
     *
     * @param replicaAssignment  The replicas with their log directory absolute path
     * @param options            The options to use when changing replica dir
     * @return                   The ChangeReplicaDirResultAlterReplicaDirResult
     */
    public abstract ChangeReplicaDirResultAlterReplicaDirResult changeReplicaDiralterReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment, ChangeReplicaDirOptionsAlterReplicaDirOptions options);
 
    /**
     * Query the log directory information for the specified log directories on the given brokers.
     * All log directories on a broker are queried if an empty collection of log directories is specified for this broker
     *
     * This operation is supported by brokers with version 0.11.1.0 or higher.
     *
     * @param logDirsByBroker     A list of log dirs per broker
     * @param options             The options to use when querying log dir info
     * @return                    The DescribeDirsResult
     */
    public abstract DescribeDirsResult describeDirs(Map<Integer, Collection<String>> logDirsByBroker, DescribeDirsOptions options);
 
    /**
     * Query the replica directory information for the specified replicas.
     *
     * This operation is supported by brokers with version 0.11.1.0 or higher.
     *
     * @param replicas      The replicas to query
     * @param options       The options to use when querying replica dir info
     * @return              The DescribeReplicaDirResult
     */
    public abstract DescribeReplicaDirResult describeReplicaDir(Collection<TopicPartitionReplica> replicas, DescribeReplicaDirOptions options);
}


/**
  * Options for the changeReplicaDiralterReplicaDir call.
  */
class ChangeReplicaDirOptionsAlterReplicaDirOptions {
    private Integer timeoutMs = null;
    public ChangeReplicaDirOptionsAlterReplicaDirOptions timeoutMs(Integer timeoutMs);
    public Integer timeoutMs();
}
 
/**
  * The result of the changeReplicaDiralterReplicaDir call.
  */
class ChangeReplicaDirResultAlterReplicaDirResult {
    /**
     * Return a map from replica to futures, which can be used to check the status of individual replica movement.
     */
    public Map<TopicPartitionReplica, KafkaFuture<Void>> values();

    /**
     * Return a future which succeeds if all the replica movement have succeeded
     */
    public KafkaFuture<Void> all();
}
 
/**
  * Options for the describeDirs call.
  */
class DescribeDirsOptions {
    private Integer timeoutMs = null;
    public DescribeDirsOptions timeoutMs(Integer timeoutMs);
    public Integer timeoutMs();
}
 
/**
  * The result of the describeDirs call.
  */
class DescribeDirsResult {
    /**
     * Return a map from brokerId to futures which can be used to check the information of partitions on each individual broker
     */
    public Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> values();

    /**
     * Return a future which succeeds only if all the brokers have responded without error
     */
    public KafkaFuture<Map<Integer, Map<String, LogDirInfo>>> all();
}
 
/**
  * Description of a log directory
  */
class LogDirInfo {
    public final Errors error;
    public final Map<TopicPartition, ReplicaInfo> replicaInfos;
}
 
/**
  * Description of a replica
  */
public class ReplicaInfo {
    public final long size;
    public final long logEndOffset;
    public final boolean isTemporary;
}
 
/**
  * Options for the describeReplicaDir call.
  */
class DescribeReplicaDirOptions {
    private Integer timeoutMs = null;
    public DescribeReplicaDirOptions timeoutMs(Integer timeoutMs);
    public Integer timeoutMs();
}
 
/**
  * The result of the describeReplicaDir call.
  */
class DescribeReplicaDirResult {
    /**
     * Return a map from replica to futures which can be used to check the log directory information of individual replicas
     */
    public Map<TopicPartitionReplica, KafkaFuture<ReplicaDirInfo>> values();

    /**
     * Return a future which succeeds if log directory information of all replicas are available
     */
    public KafkaFuture<Map<TopicPartitionReplica, ReplicaDirInfo>> all();
}

/**
  * Log directory information of a given replica and its intra-broker movement progress
  */
class ReplicaDirInfo {
    public String currentReplicaDir;
    public String temporaryReplicaDir;
    public long temporaryReplicaOffsetLag;
}

...

- Validated client/cluster state for topic1. 

Rejected Alternatives

  

1) Write the replica -> log directory mapping in the reassignment znode and have controller send AlterReplicaDirRequest to brokers.

This alternative solution has a few drawbacks:
- There can be use-cases where we only want to rebalance the load of log directories on a given broker. It seems unnecessary to go through controller in this case.
- If controller is responsible for sending AlterReplicaDirRequest, and if the user-specified log directory is either invalid or offline, then controller probably needs a way to tell user that the partition reassignment has failed. We currently don't have a way to do this since kafka-reassign-partition.sh simply creates the reassignment znode without waiting for response. I am not sure that is a good solution to this.
- If controller is responsible for sending AlterReplicaDirRequest, the controller logic would be more complicated because controller needs to first send AlterReplicaDirRequest so that the broker memorize the partition -> log directory mapping, send LeaderAndIsrRequest, and keep sending AlterReplicaDirRequest (just in case broker restarted) until replica is created. Note that the last step needs repeat and timeout as the proposed in the KIP-113.

Overall this alterantive adds quite a bit complexity to controller and we probably want to do this only if there is strong clear of doing so. Currently in KIP-113 the kafka-reassign-partitions.sh is responsible for sending AlterReplicaDirRequest with repeat and provides error to user if it either fails or timeout. It seems to be much simpler and user shouldn't care whether it is done through controller.