Table of Contents |
---|
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: KAFKA-5856
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
New network protocol APIs will be added:
CreatePartitionsRequest
andCreatePartitionsResponse
IncreasePartitionCountsRequest
and KIP-195: AdminClient.increasePartitions
The AdminClient
API will have new methods added (plus overloads for options):
Proposed Changes
AdminClient:
...
createPartitions()
Anchor | ||||
---|---|---|---|---|
|
kafka-topics.sh --alter --partitions ...
...
- This API is synchronous in the sense that the client can assume that the partition count has been changed (or the request was rejected) once they have obtained the result for the topic from the
IncreasePartitionCountsResult
CreatePartitionsResult
.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * <p>Increase the number partitionof countpartitions of the topics given as the keys of {@code countsnewPartitions} * according to the corresponding values.</p> */ public IncreasePartitionCountsResultCreatePartitionsResult increasePartitionCountscreatePartitions(Map<String, PartitionCount>NewPartitions> countsnewPartitions, IncreasePartitionCountsOptionsCreatePartitionsOptions options) public IncreasePartitionCountsResultCreatePartitionsResult increasePartitionCountscreatePartitions(Map<String, PartitionCount>NewPartitions> countsnewPartitions) |
Where:
Code Block | ||||
---|---|---|---|---|
| ||||
/** Describes anew changepartitions infor a topic's partition countparticular topic. */ public class PartitionCountNewPartitions { private int partitionCountnewNumPartitions; private List<List<Integer>> assignments; private PartitionCountNewPartitions(int partitionCountnewNumPartitions) { ... } /** * Increase the partitionnumber count for a topicof partitions to the given {@code newCount}. * The assignment of new replicas to brokers will be decided by the broker * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p> */ public static PartitionCountNewPartitions increasePartitionCountincreaseTo(int newCount) { ... } /** * <p>Increase the partitionnumber count for a topicof partitions to the given {@code newCount} * assigning the new partitions according to the given {@code newAssignments}. * The length of {@code newAssignments} should equal {@code newCount - oldCount}, since * the assignment of existing partitions are not changed. * Each inner list of {@code newAssignments} should have a length equal to * the topic's replication factor. * The first broker id in each inner list is the "preferred replica".</p> * * <p>For example, suppose a topic currently has a replication factor of 2, and * has 3 partitions. The number of partitions can be increased to 4 * (with broker 1 being the preferred replica for the new partition) using a * {@code PartitionCount} constructed like this:</p> * * <pre><code>PartitionCount<pre><code>NewPartitions.increasePartitionCountincreaseTo(4, Arrays.asList(Arrays.asList(1, 2))</code></pre> * */ public static PartitionCountNewPartitions increasePartitionCountincreaseTo(int newCount, List<List<Integer>> newAssignments) { ... } } public class IncreasePartitionCountsOptionsCreatePartitionsOptions { public IncreasePartitionCountsOptionsCreatePartitionsOptions() { ... } public Integer timeoutMs() { ... } public IncreasePartitionCountsOptionsCreatePartitionsOptions timeoutMs(Integer timeoutMs) { ... } public boolean validateOnly() { ... } /** * Validate the request only: Do not actually change any partition counts. */ public IncreasePartitionCountsOptionsCreatePartitionsOptions validateOnly() { ... } } public class IncreasePartitionCountsResultCreatePartitionsResult { // package access constructor Map<String, KafkaFuture<Void>> values() { ... } KafkaFuture<Void> all() { ... } } |
Network Protocol:
...
CreatePartitionsRequest and
...
CreatePartitionsResponse
Anchor | ||||
---|---|---|---|---|
|
IncreasePartitionCountsRequest
CreatePartitionsRequest
is used to increase the partition count for a batch of topics, and is the basis for the AdminClient.increasePartitionCountscreatePartitions()
method.The request can be sent to any broker.
...
After validating the request the broker calls AdminUtils.addPartitions()
which ultimately updates the topic partition assignment znode (/brokers/topics/${topic}
).
No Format |
---|
IncreasePartitionCountsRequestCreatePartitionsRequest => [topic_partition_count] timeout topic_partition_count => topic partition_count topic => STRING partition_count => count [assignment] count => INT32 assignment => [INT32] timeout => INT32 |
...
Field | Description |
---|---|
topic | the name of a topic |
count | the new partition count |
assignment | a list of assigned brokers (one list for each new partition) |
timeout | The maximum time to await a response in ms. |
Anchor | ||||
---|---|---|---|---|
|
No Format |
---|
IncreasePartitionCountsResponseCreatePartitionsResponse => throttle_time_ms [topic_partition_count_error] topic_partition_count_error => topic error_code error_message topic => STRING error_code => INT16 error_message => NULLABLE_STRING |
...
Compatibility, Deprecation, and Migration Plan
...
...
This is a new API and won't directly affect existing users.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.
PartitionCount
is NewPartitions
is inconsistent because it takes a number of partitions, but only assignments for the new partitions. One is absolute and the other is a difference. The reasons for this are:
PartitionCount
NewPartitions
could take an increment, rather than the new "absolute" number of partitions. But this makes the request non-idempotent, with consequent possibilities of a double increment. This would be particularly bad because it's not possible to decrease the partition count.PartitionCount
NewPartitions
could take a complete assignment for both old and new partitions. This would incorrectly suggest that the request could increase the number of partitions and effect a reassignment of the existing partitions at the same time. The server would have to either ignore the old partitions (in which case why were they required to be provided?) or validate them (in which case the client has to know the old assignment in order to add more, which is needlessly difficult).
...