...
The idea is that user should be able to specify log directory when using kafka-reassign-partitions.sh to reassign partition. If user has specified log directory on the destination broker, the script should send AlterReplicaDirRequest
directly to the broker so that broker can start ReplicaMoveThread
to move the replica. Finally, the script should send DescribeDirsRequest
DescribeLogDirsRequest
to broker to verify that the replica has been created/moved in the specified log directory when user requests to verify the assignment.
...
- kafka-reassignment-partitions.sh will verify partition assignment across brokers as it does now.
- For those replicas with destination log directory != "any", kafka-reassignment-partitions.sh groups those replicas according to their brokers and and sends DescribeDirsRequest
DescribeLogDirsRequest
to those brokers. The DescribeDirsRequest
DescribeLogDirsRequest
should provide the log directories and partitions specified in the expected assignment.
- Broker replies with DescribeDirsResponse
DescribeLogDirsResponse
which shows the current log directory for each partition specified in the DescribeDirsRequest
DescribeLogDirsRequest
.
- kafka-reassignment-partitions.sh determines whether the replica has been moved to the specified log directory based on the DescribeDirsResponse
DescribeLogDirsResponse
.
3) How to retrieve information to determine the new replica assignment across log directories
...
In order to optimize replica assignment across log directories, user would need to figure out the list partitions per log directory, the size of each partition. As of now Kafka doesn't expose this information via any RPC and user would need to either query the JMX metrics of the broker, or use external tools to log onto each machine to get this information. While it is possible to retrieve these information via JMX, users would have to manage JMX port and related credentials. It is better if Kafka can expose this information via RPC.
Solution:
We introduce DescribeDirsRequest
DescribeLogDirsRequest
and DescribeDirsResponse
DescribeLogDirsResponse
. When a broker receives DescribeDirsRequest
DescribeLogDirsRequest
with empty list of log directories, it will respond with a DescribeDirsResponse
DescribeLogDirsResponse
which shows the size of each partition and lists of partitions per log directory for all log directories. If user has specified a list of log directories in the DescribeDirsRequest
DescribeLogDirsRequest
, the broker will provide the above information for only log directories specified by the user. If user has specified an empty list of topics in the DescribeDirsRequest
DescribeLogDirsRequest
, all topics will be queried and included in the response. Otherwise, only those topics specified in the DescribeDirsRequest
DescribeLogDirsRequest
will be queried. Non-zero error code will be specified in the DescribeDirsResponse
DescribeLogDirsResponse
for each log directory that is either offline or not found by the broker.
...
Code Block |
---|
AlterReplicaDirResponse => topics topics => [AlterReplicaDirResponseTopic] AlterReplicaDirResponseTopic => topic partitions topic => str partitions => [AlterReplicaDirResponsePartition] AlterReplicaDirResponsePartition => partition error_code partition => int32 error_code => int16 |
Create DescribeDirsRequestDescribeLogDirsRequest
Code Block |
---|
// log_dirs and topics are used to filter the results to include only the specified log_dir/topic. The result is the intersection of both filters. DescribeDirsRequestDescribeLogDirsRequest => log_dirs topics log_dirs => [str] // If this is empty, then all log directories will be queried topics => DescribeDirsRequestTopicDescribeLogDirsRequestTopic // If this is empty, all topics will be queried DescribeDirsRequestTopicDescribeLogDirsRequestTopic => topic partitions topic => str partitions => [int32] |
Create DescribeDirsResponseDescribeLogDirsResponse
Code Block |
---|
DescribeDirsResponseDescribeLogDirsResponse => log_dirs log_dirs => [DescribeDirsResponseDirMetadataDescribeLogDirsResponseDirMetadata] DescribeDirsResponseDirMetadataDescribeLogDirsResponseDirMetadata => error_code path topics error_code => int16 path => str topics => [DescribeDirsResponseTopicDescribeLogDirsResponseTopic] DescribeDirsResponseTopicDescribeLogDirsResponseTopic => topic partitions topic => str partitions => [DescribeDirsResponsePartitionDescribeLogDirsResponsePartition] DescribeDirsResponsePartitionDescribeLogDirsResponsePartition => partition size offset_lag is_temporary partition => int32 size => int64 offset_lag => int64 // If this is not a temporary replica, then offset_lag = max(0, HW - LEO). Otherwise, offset_lag = primary Replica's LEO - temporary Replica's LEO is_temporary => boolean // True if replica is *.move |
Broker Config
1) Add config intra.broker.throttled.rate
. This config specified the maximum rate in bytes-per-second that can be used to move replica between log directories. This config defaults to MAX_LONG. The intra.broker.throttled.rate is per-broker and the specified capacity is shared by all replica-movement-threads.
2) Add config num.replica.move.threads
. This config specified the number of threads in ReplicaMoveThreadPool
. The thread in this thread pool is responsible to moving replica between log directories. This config defaults to the number of log directories. Note that we typically expect 1-1 mapping between log directories and disks. Thus setting the config to number of log directories by default provides a reasonable way to keep the movement capacity in proportion with the number of disks.
...
...
Code Block | ||
---|---|---|
| ||
public interface AdminClient extends AutoCloseable { /** * Query the log directory information for the specified log directories on the given brokers. * All log directories on a broker are queried if an empty collection of log directories is specified for this broker * * This operation is supported by brokers with version 0.11.1.0 or higher. * * @param logDirsByBroker A list of log dirs per broker * @param options The options to use when querying log dir info * @return The DescribeDirsResultDescribeLogDirsResult */ public abstract DescribeDirsResultDescribeLogDirsResult describeDirsdescribeLogDirs(Map<Integer, Collection<String>> logDirsByBroker, DescribeDirsOptionsDescribeLogDirsOptions options); /** * Query the replica directory information for the specified replicas. * * This operation is supported by brokers with version 0.11.1.0 or higher. * * @param replicas The replicas to query * @param options The options to use when querying replica dir info * @return The DescribeReplicaDirResultDescribeReplicaLogDirsResult */ public abstract DescribeReplicaDirResultDescribeReplicaLogDirsResult describeReplicaDirdescribeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaDirOptionsDescribeReplicaLogDirsOptions options); } public class KafkaAdminClient extends AdminClient { /** * Alter the log directory for the specified replicas. * * Updates are not transactional so they may succeed for some resources while fail for others. The log directory for * a particular replica is updated atomically. * * This operation is supported by brokers with version 0.11.1.0 or higher. * * @param replicaAssignment The replicas with their log directory absolute path * @param options The options to use when changing replica dir * @return The AlterReplicaDirResult */ public AlterReplicaDirResult alterReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaDirOptions options); } /** * Options for the alterReplicaDir call. */ class AlterReplicaDirOptions { private Integer timeoutMs = null; public AlterReplicaDirOptions timeoutMs(Integer timeoutMs); public Integer timeoutMs(); } /** * The result of the alterReplicaDir call. */ class AlterReplicaDirResult { /** * Return a map from replica to futures, which can be used to check the status of individual replica movement. */ public Map<TopicPartitionReplica, KafkaFuture<Void>> values(); /** * Return a future which succeeds if all the replica movement have succeeded */ public KafkaFuture<Void> all(); } /** * Options for the describeDirs call. */ class DescribeDirsOptionsDescribeLogDirsOptions { private Integer timeoutMs = null; public DescribeDirsOptionsDescribeLogDirsOptions timeoutMs(Integer timeoutMs); public Integer timeoutMs(); } /** * The result of the describeDirs call. */ class DescribeDirsResultDescribeLogDirsResult { /** * Return a map from brokerId to futures which can be used to check the information of partitions on each individual broker */ public Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> values(); /** * Return a future which succeeds only if all the brokers have responded without error */ public KafkaFuture<Map<Integer, Map<String, LogDirInfo>>> all(); } /** * Description of a log directory */ class LogDirInfo { public final Errors error; public final Map<TopicPartition, ReplicaInfo> replicaInfos; } /** * Description of a replica */ public class ReplicaInfo { public final long size; public final long logEndOffset; public final boolean isTemporary; } /** * Options for the describeReplicaDir call. */ class DescribeReplicaDirOptionsDescribeReplicaLogDirsOptions { private Integer timeoutMs = null; public DescribeReplicaDirOptionsDescribeReplicaLogDirsOptions timeoutMs(Integer timeoutMs); public Integer timeoutMs(); } /** * The result of the describeReplicaDir call. */ class DescribeReplicaDirResultDescribeReplicaLogDirsResult { /** * Return a map from replica to futures which can be used to check the log directory information of individual replicas */ public Map<TopicPartitionReplica, KafkaFuture<ReplicaDirInfo>> values(); /** * Return a future which succeeds if log directory information of all replicas are available */ public KafkaFuture<Map<TopicPartitionReplica, ReplicaDirInfo>> all(); } /** * Log directory information of a given replica and its intra-broker movement progress */ class ReplicaDirInfo { public String currentReplicaDir; public String temporaryReplicaDir; public long temporaryReplicaOffsetLag; } |
...