...
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion Completed
Discussion thread: here
JIRA: KAFKA-5163 and KAFKA-5694
Released: <Kafka Version>1.1.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block |
---|
AlterReplicaDirRequest => topics topics => [AlterReplicaDirRequestTopic] AlterReplicaDirRequestTopic => topic partitions topic => str partitions => [AlterReplicaDirRequestPartition] AlterReplicaDirRequestPartition => partition log_dir partition => int32 log_dir => str |
Code Block |
---|
AlterReplicaDirResponse => topics topics => [AlterReplicaDirResponseTopic] AlterReplicaDirResponseTopic => topic partitions topic => str partitions => [AlterReplicaDirResponsePartition] AlterReplicaDirResponsePartition => partition error_code partition => int32 error_code => int16 |
Create DescribeLogDirsRequest
Code Block |
---|
// log_dirs and topics are used to filter the results to include only the specified log_dir/topic. The result is the intersection of both filters.
DescribeLogDirsRequest => topics
topics => DescribeLogDirsRequestTopic // If this is empty, all topics will be queried
DescribeLogDirsRequestTopic => topic partitions
topic => str
partitions => [int32] |
Create DescribeLogDirsResponse
Code Block |
---|
DescribeLogDirsResponse => log_dirs log_dirs => [DescribeLogDirsResponseDirMetadata] DescribeLogDirsResponseDirMetadata => error_code path topics error_code => int16 path => str topics => [DescribeLogDirsResponseTopic] DescribeLogDirsResponseTopic => topic partitions topic => str partitions => [DescribeLogDirsResponsePartition] DescribeLogDirsResponsePartition => partition size offset_lag is_temporary partition => int32 size => int64 offset_lag => int64 // If this is not a temporary replica, then offset_lag = max(0, HW - LEO). Otherwise, offset_lag = primary Replica's LEO - temporary Replica's LEO is_temporary => boolean // True if replica is *.move |
Broker Config
1) Add config intra.broker.throttled.rate
. This config specified the maximum rate in bytes-per-second that can be used to move replica between log directories. This config defaults to MAX_LONG. The intra.broker.throttled.rate is per-broker and the specified capacity is shared by all replica-movement-threads.
2) Add config num.replica.alter.movelog.dirs.threads
. This config specified the number of threads in ReplicaMoveThreadPool
. The thread in this thread pool is responsible to moving replica between log directories. This config defaults to the number of log directories. Note that we typically expect 1-1 mapping between log directories and disks. Thus setting the config to number of log directories by default provides a reasonable way to keep the movement capacity in proportion with the number of disks.
...