...
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion Completed
Discussion thread: here
JIRA: here KAFKA-5163 and KAFKA-5694
Released: <Kafka Version>1.1.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
The idea is that user can send a ChangeReplicaDirRequest
AlterReplicaDirRequest
which tells broker to move topicPartition
directory (which contains all log segments of the topicPartition
replica) from the source log directory to a destination log directory. Broker can create a new directory with .move postfix on the destination log directory to hold all log segments of the replica. This allows broker to tell log segments of the replica on the destination log directory from log segments of the replica on the source log directory during broker startup. The broker can create new log segments for the replica on the destination log directory, push data from source log to the destination log, and replace source log with the destination log for this replica once the new log has caught up.
...
1. Initiate replica movement using ChangeReplicaDirRequestusing AlterReplicaDirRequest
User uses kafka-reassignment-partitions.sh to send ChangeReplicaDirRequest AlterReplicaDirRequest to broker to initiate replica movement between its log directories. The flow graph below illustrates how broker handles ChangeReplicaDirRequest
AlterReplicaDirRequest
.
Notes:
- Broker will cancel existing movement of the replica if "any" is specified as destination log directory.
- If broker doesn't not have already replica created for the specified topicParition when it receives ChangeReplicaDirRequestAlterReplicaDirRequest, it will reply ReplicaNotAvailableException AND remember (replica, destination log directory) pair in memory to create the replica in the specified log directory when it receives LeaderAndIsrRequest later.
...
Here we describe how a broker moves a Log
from source to destination log directory and swaps the Log
. This corresponds to the "Initiate replica data movement" box in the flow graph above. Note that the broker responds to ChangeReplicaDirRequest
AlterReplicaDirRequest
with MoveInProgress
after step 1) described below.
...
- If both the directory topicPartition and the directory topicPartition.move exist on good log directories, broker will start ReplicaMoveThread
to copy data from topicPartition
to topicPartition.move
. The effect is the same as if broker has received ChangeReplicaDirRequest
AlterReplicaDirRequest
to move replica from topicPartition
to topicPartition.move
.
- If topicPartition.move exists but topicPartition doesn't exist on any good log directory, and if there is no bad log directory, then broker renames topicPartition.move to topicPartition.
- If topicPartition.move exists but topicPartition doesn't exist on any good log directory, and if there is bad log directory, then broker considers topicPartition as offline and would not touch topicPartition.move.
- If topicPartition.delete exists, the broker schedules topicParition.delete for asynchronous delete.
...
The idea is that user should be able to specify log directory when using kafka-reassign-partitions.sh to reassign partition. If user has specified log directory on the destination broker, the script should send ChangeReplicaDirRequest
AlterReplicaDirRequest
directly to the broker so that broker can start ReplicaMoveThread
to move the replica. Finally, the script should send DescribeDirsRequest
DescribeLogDirsRequest
to broker to verify that the replica has been created/moved in the specified log directory when user requests to verify the assignment.
...
- User specifies a list of log directories, one log directory per replica, for each topic partition in the reassignment json file that is provided to kafka-reassignment-partitions.sh. The log directory specified by user must be either "any", or absolute path which begins with '/'. See Scripts
section for the format of this json file.
- The script sends ChangeReplicaDirRequest to those brokers which need to move replicas to user-specified log directory. If "any" is specified as the log directory, the broker is free to choose any log directory to place the replica. Current broker implementation will select log directory using round-robin algorithm by default. See Scripts
section for the format of this json file.
- The script sends AlterReplicaDirRequest to those brokers which need to move replicas to user-specified log directory. This step can be skipped if user has specified "any" as log directory for all replicas. The script exits with error if the broker to receive ChangeReplicaDirRequest AlterReplicaDirRequest is offline or if the ChangeReplicaDirResponse AlterReplicaDirResponse contains any error that is not ReplicaNotAvailableException.
- The script creates reassignment znode in zookeeper.
- The script retries ChangeReplicaDirRequest AlterReplicaDirRequest to those brokers which have responded with ReplicaNotAvailableException in the ChangeReplicaDirResponse AlterReplicaDirResponse previously. The script keeps retrying up to user-specified timeout. The timeout is 10 seconds by default. The script exits with error if the broker to receive ChangeReplicaDirRequest AlterReplicaDirRequest is offline or if the ChangeReplicaDirResponse AlterReplicaDirResponse contains any error that is not ReplicaNotAvailableException.
- The script returns result to user.
...
- kafka-reassignment-partitions.sh will verify partition assignment across brokers as it does now.
- For those replicas with destination log directory != "any", kafka-reassignment-partitions.sh groups those replicas according to their brokers and and sends DescribeDirsRequest
DescribeLogDirsRequest
to those brokers. The DescribeDirsRequest
DescribeLogDirsRequest
should provide the log directories and partitions specified in the expected assignment.
- Broker replies with DescribeDirsResponse
DescribeLogDirsResponse
which shows the current log directory for each partition specified in the DescribeDirsRequest
DescribeLogDirsRequest
.
- kafka-reassignment-partitions.sh determines whether the replica has been moved to the specified log directory based on the DescribeDirsResponse
DescribeLogDirsResponse
.
3) How to retrieve information to determine the new replica assignment across log directories
...
In order to optimize replica assignment across log directories, user would need to figure out the list partitions per log directory, the size of each partition. As of now Kafka doesn't expose this information via any RPC and user would need to either query the JMX metrics of the broker, or use external tools to log onto each machine to get this information. While it is possible to retrieve these information via JMX, users would have to manage JMX port and related credentials. It is better if Kafka can expose this information via RPC.
Solution:
We introduce DescribeDirsRequest
DescribeLogDirsRequest
and DescribeDirsResponse
DescribeLogDirsResponse
. When a broker receives DescribeDirsRequest
DescribeLogDirsRequest
with empty list of log directories, it will respond with a DescribeDirsResponse
DescribeLogDirsResponse
which shows the size of each partition and lists of partitions per log directory for all log directories. If user has specified a list of log directories in the DescribeDirsRequest
DescribeLogDirsRequest
, the broker will provide the above information for only log directories specified by the user. If user has specified an empty list of topics in the DescribeDirsRequest
DescribeLogDirsRequest
, all topics will be queried and included in the response. Otherwise, only those topics specified in the DescribeDirsRequest
DescribeLogDirsRequest
will be queried. Non-zero error code will be specified in the DescribeDirsResponse
DescribeLogDirsResponse
for each log directory that is either offline or not found by the broker.
...
Public interface
Protocol
Create ChangeReplicaDirRequestAlterReplicaDirRequest
Code Block |
---|
ChangeReplicaDirRequestAlterReplicaDirRequest => topics topics => [ChangeReplicaDirRequestTopicAlterReplicaDirRequestTopic] ChangeReplicaDirRequestTopicAlterReplicaDirRequestTopic => topic partitions topic => str partitions => [ChangeReplicaDirRequestPartitionAlterReplicaDirRequestPartition] ChangeReplicaDirRequestPartitionAlterReplicaDirRequestPartition => partition log_dir partition => int32 log_dir => str |
Code Block |
---|
ChangeReplicaDirResponseAlterReplicaDirResponse => topics topics => [ChangeReplicaDirResponseTopicAlterReplicaDirResponseTopic] ChangeReplicaDirResponseTopicAlterReplicaDirResponseTopic => topic partitions topic => str partitions => [ChangeReplicaDirResponsePartitionAlterReplicaDirResponsePartition] ChangeReplicaDirResponsePartitionAlterReplicaDirResponsePartition => partition error_code partition => int32 error_code => int16 |
Create DescribeDirsRequestDescribeLogDirsRequest
Code Block |
---|
// log_dirs andDescribeLogDirsRequest => topics topics are=> usedDescribeLogDirsRequestTopic to// filterIf thethis results to include only the specified log_dir/topic. The result is the intersection of both filters. DescribeDirsRequestis empty, all topics will be queried DescribeLogDirsRequestTopic => topic partitions topic => str partitions => [int32] |
Create DescribeLogDirsResponse
Code Block |
---|
DescribeLogDirsResponse => log_dirs topics log_dirs => [strDescribeLogDirsResponseDirMetadata] DescribeLogDirsResponseDirMetadata // If this is null, then all log directories will be queried=> error_code path topics error_code => int16 path => str topics => [strDescribeLogDirsResponseTopic] // If this is null, all topics will be queried |
Create DescribeDirsResponse
Code Block |
---|
DescribeDirsResponse DescribeLogDirsResponseTopic => topic partitions topic => log_dirsstr log_dirspartitions => [DescribeDirsResponseDirMetadataDescribeLogDirsResponsePartition] DescribeDirsResponseDirMetadataDescribeLogDirsResponsePartition => error_codepartition path topicssize offset_lag is_temporary error_codepartition => int16int32 pathsize => strint64 topicsoffset_lag => [DescribeDirsResponseTopic] DescribeDirsResponseTopic => topic partitions topic => str partitions => [DescribeDirsResponsePartition] DescribeDirsResponsePartition => partition size log_end_offset is_temporary partition => int32 size => int64 log_end_offset => int64int64 // If this is not a temporary replica, then offset_lag = max(0, HW - LEO). Otherwise, offset_lag = primary Replica's LEO - temporary Replica's LEO is_temporary => boolean // True if replica is *.move |
Broker Config
1) Add config intra.broker.throttled.rate
. This config specified the maximum rate in bytes-per-second that can be used to move replica between log directories. This config defaults to MAX_LONG. The intra.broker.throttled.rate is per-broker and the specified capacity is shared by all replica-movement-threads.
2) Add config num.replica.movealter.log.dirs.threads
. This config specified the numbe number of threads in ReplicaMoveThreadPool
. The thread in this thread pool is responsible to moving replica between log directories. This config defaults to the number of log directories.
Scripts
Note that we typically expect 1-1 mapping between log directories and disks. Thus setting the config to number of log directories by default provides a reasonable way to keep the movement capacity in proportion with the number of disks.
Scripts
1) Add kafka-log-dirs.sh which allows user to get list of replicas per log directory on a broker.
...
Code Block |
---|
{
|
Code Block |
---|
{ "version" : 1, "log_dirs" : [ { "is_live" : boolean, "path" : str, "partitions": [ { "topic" : str, "partition" : int32, "size" : int64, "offset_lag" : in64, "is_temporary" : boolean }, ... ] }, ... ] } |
Code Block |
---|
{ "version" : int, "partitions" : [ { "topic" : str, "partition" : int, "replicas" : [int], "log_dirs" : [str] <-- NEW. A log directory can be either "any", or a valid absolute path that begins with '/' . This is an }, optional filed. It is treated as an array of "any" if this field is not explicitly specified in the json file. }, ... ] } |
Note: the quota specified by the argument `–throttle` will be applied to only inter-broker replica reassignment. It does not affect the quota for replica movement between log directories.
...
3) Add optional argument --timeout to kafka-reassignment-partitions.sh. This is because kafka-reassignment-partitions.sh may need to re-send ChangeReplicaDirRequest
AlterReplicaDirRequest
to broker if replica hasn't already been created there. The timeout is set to 10 seconds by default.
Changes in Operational Procedures
In this section we describe the expected changes in operational procedures in order to run Kafka with JBOD. Administrators of Kafka cluster need to be aware of these changes before switching from RAID-10 to JBOD.
- Need to load balance across log directories
When running Kafka with RAID-10, we only need to take care of load imbalance across brokers. Administrator can balance load across brokers using the script kafka-reassign-partitions.sh. After switching from RAID-10 to JBOD, we will start to see load imbalance across log directories. In order to address this problem, administrator needs to get the partition assignment and their size per log directory using kafka-log-dirs.sh, determine the reassignment of replicas per log directory per broker, and provide partition to (broker, log_directory) mapping as input to kafka-reassign-partitions.sh to execute the new assignment.
Administrator also needs to be prepared that the need to rebalance across log directories will probably be much more frequent than the need to rebalance across brokers since the capacity of individual disk is likely much smaller than the capacity of existing RAID-10 setup.
Compatibility, Deprecation, and Migration Plan
This KIP is a pure addition. So there is no backward compatibility concern.
The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.
Test Plan
The new features will be tested through unit, integration, and system tests. In the following we explain the system tests only.
- Brokers are all running and show expected error message
- topic description shows expected results for all topics
- A pair of producer and consumer can successfully produce/consume from each monitored topic without message loss or duplication.
AdminClient
The following methods and classes are added.
Code Block | ||
---|---|---|
| ||
public interface AdminClient extends AutoCloseable {
/**
* Query the log directory information for the specified log directories on the given brokers.
* All log directories on a broker are queried if an empty collection of log directories is specified for this broker
*
* This operation is supported by brokers with version 0.11.1.0 or higher.
*
* @param logDirsByBroker A list of log dirs per broker
* @param options The options to use when querying log dir info
* @return The DescribeLogDirsResult
*/
public abstract DescribeLogDirsResult describeLogDirs(Map<Integer, Collection<String>> logDirsByBroker, DescribeLogDirsOptions options);
/**
* Query the replica directory information for the specified replicas.
*
* This operation is supported by brokers with version 0.11.1.0 or higher.
*
* @param replicas The replicas to query
* @param options The options to use when querying replica dir info
* @return The DescribeReplicaLogDirsResult
*/
public abstract DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options);
}
public class KafkaAdminClient extends AdminClient {
/**
* Alter the log directory for the specified replicas.
*
* Updates are not transactional so they may succeed for some resources while fail for others. The log directory for
* a particular replica is updated atomically.
*
* This operation is supported by brokers with version 0.11.1.0 or higher.
*
* @param replicaAssignment The replicas with their log directory absolute path
* @param options The options to use when changing replica dir
* @return The AlterReplicaDirResult
*/
public AlterReplicaDirResult alterReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaDirOptions options);
}
/**
* Options for the alterReplicaDir call.
*/
class AlterReplicaDirOptions {
private Integer timeoutMs = null;
public AlterReplicaDirOptions timeoutMs(Integer timeoutMs);
public Integer timeoutMs();
}
/**
* The result of the alterReplicaDir call.
*/
class AlterReplicaDirResult {
/**
* Return a map from replica to futures, which can be used to check the status of individual replica movement.
*/
public Map<TopicPartitionReplica, KafkaFuture<Void>> values();
/**
* Return a future which succeeds if all the replica movement have succeeded
*/
public KafkaFuture<Void> all();
}
/**
* Options for the describeDirs call.
*/
class DescribeLogDirsOptions {
private Integer timeoutMs = null;
public DescribeLogDirsOptions timeoutMs(Integer timeoutMs);
public Integer timeoutMs();
}
/**
* The result of the describeDirs call.
*/
class DescribeLogDirsResult {
/**
* Return a map from brokerId to futures which can be used to check the information of partitions on each individual broker
*/
public Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> values();
/**
* Return a future which succeeds only if all the brokers have responded without error
*/
public KafkaFuture<Map<Integer, Map<String, LogDirInfo>>> all();
}
/**
* Description of a log directory
*/
class LogDirInfo {
public final Errors error;
public final Map<TopicPartition, ReplicaInfo> replicaInfos;
}
/**
* Description of a replica
*/
public class ReplicaInfo {
public final long size;
public final long logEndOffset;
public final boolean isTemporary;
}
/**
* Options for the describeReplicaDir call.
*/
class DescribeReplicaLogDirsOptions {
private Integer timeoutMs = null;
public DescribeReplicaLogDirsOptions timeoutMs(Integer timeoutMs);
public Integer timeoutMs();
}
/**
* The result of the describeReplicaDir call.
*/
class DescribeReplicaLogDirsResult {
/**
* Return a map from replica to futures which can be used to check the log directory information of individual replicas
*/
public Map<TopicPartitionReplica, KafkaFuture<ReplicaDirInfo>> values();
/**
* Return a future which succeeds if log directory information of all replicas are available
*/
public KafkaFuture<Map<TopicPartitionReplica, ReplicaDirInfo>> all();
}
/**
* Log directory information of a given replica and its intra-broker movement progress
*/
class ReplicaDirInfo {
public String currentReplicaDir;
public String temporaryReplicaDir;
public long temporaryReplicaOffsetLag;
}
|
Changes in Operational Procedures
In this section we describe the expected changes in operational procedures in order to run Kafka with JBOD. Administrators of Kafka cluster need to be aware of these changes before switching from RAID-10 to JBOD.
- Need to load balance across log directories
When running Kafka with RAID-10, we only need to take care of load imbalance across brokers. Administrator can balance load across brokers using the script kafka-reassign-partitions.sh. After switching from RAID-10 to JBOD, we will start to see load imbalance across log directories. In order to address this problem, administrator needs to get the partition assignment and their size per log directory using kafka-log-dirs.sh, determine the reassignment of replicas per log directory per broker, and provide partition to (broker, log_directory) mapping as input to kafka-reassign-partitions.sh to execute the new assignment.
Administrator also needs to be prepared that the need to rebalance across log directories will probably be much more frequent than the need to rebalance across brokers since the capacity of individual disk is likely much smaller than the capacity of existing RAID-10 setup.
Compatibility, Deprecation, and Migration Plan
This KIP is a pure addition. So there is no backward compatibility concern.
The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.
Test Plan
The new features will be tested through unit, integration, and system tests. In the following we explain the system tests only.
- Brokers are all running and show expected error message
- topic description shows expected results for all topics
- A pair of producer and consumer can successfully produce/consume from each monitored topic without message loss or duplication.
- Create a topic of 3 partition with 3 replication_factor=1
- Start a pair of producer and consumer to produce/consume from the topic
- Get current log directory of each replica
- Run kafka-reassignment-partitions.sh to move each replica to the other log directory on the same broker
- Run kafka-reassignment-partitions.sh to periodically verify and wait for reassignment to complete within reasonable amount of time
- validate client/cluster state
- Run kafka-reassignment-partitions.sh to move each replica to the currently-unused log directory on the "next" broker. I.e., replica1 is moved from broker1 -> broker2, replica2 is moved from broker2 -> broker3, and replica3 is moved from broker3 -> broker1.
- Run kafka-reassignment-partitions.sh to periodically verify and wait for reassignment to complete within reasonable amount of time
- validate client/cluster state
2) Verify that bad log directories discovered during runtime do not affect replicas on the good log directories.
- Start 1 zookeeper and 3 brokers. Each broker has 2 log directories.
- Create a topic of 3 partition with 3 replication_factor=1
- Start a pair of producer and consumer to produce/consume from the topic
- Get current log directory of each replica
- Run log directories.
- Create topic1 of 1 partition with 3 replicas. Run kafka-reassignment-partitions.sh to move each replica to move replicas of topic1 to the other first log directory on the same of each broker.
- Run kafka-reassignment-partitions.sh to periodically verify and wait for reassignment to complete within reasonable amount of time
- validate client/cluster state
- Create topic2 of 1 partition with 3 replicas. Run kafka-reassignment-partitions.sh to move each replica replicas of topic2 to the currently-unused second log directory on the "next" broker. I.e., replica1 is moved from broker1 -> broker2, replica2 is moved from broker2 -> broker3, and replica3 is moved from broker3 -> broker1.
- Run kafka-reassignment-partitions.sh to periodically verify and wait for reassignment to complete within reasonable amount of time
- validate client/cluster state
2) Verify that bad log directories discovered during runtime do not affect replicas on the good log directories.
- Start 1 zookeeper and 3 brokers. Each broker has 2 log directories.
- Create topic1 of 1 partition with 3 replicas. Run kafka-reassignment-partitions.sh to move replicas of topic1 to the first log directory of each broker.
- Create topic2 of 1 partition with 3 replicas. Run kafka-reassignment-partitions.sh to move replicas of topic2 to the second log directory of each broker.
- Start a pair of producer and consumer to produce/consume from topic1
- Start a pair of producer and consumer to produce/consume from topic2
- Change permission of the second log directory of a follower broker of topic1's partition to be 000.
- Validated client/cluster state for both topics.
- Validated client/cluster state for both topics.
- Change permission of the second log directory of all brokers to be 000.
Rejected Alternatives
...
- Start a pair of producer and consumer to produce/consume from topic1
- Start a pair of producer and consumer to produce/consume from topic2
- Change permission of the second log directory of a follower broker of topic1's partition to be 000.
- Validated client/cluster state for both topics.
- Validated client/cluster state for both topics.
- Change permission of the second log directory of all brokers to be 000.
Rejected Alternatives