...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).e
Motivation
Describe the problems you are trying to solve.
...
New network protocol APIs will be added:
The AdminClient
API will have new methods added (plus overloads for options):
...
...
Use case | command | AdminClient | Increase partition count | kafka-topics --alter --topic T --partitions PincreasePartitionCount() | |
---|---|---|---|---|---|
Change replication factor | kafka-reassign-partitions --execute --reassignment-json-file J |
| |||
Change partition assignment | kafka-reassign-partitions --execute --reassignment-json-file J | reassignPartitions() | |||
Change partition assignment with throttle | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R |
| |||
Change throttled rate | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R |
| |||
Check progress of a reassignment | kafka-reassign-partitions --progress --reassignment-json-file J | (see KIP-113) | |||
Check result and clear throttle | kafka-reassign-partitions --verify --reassignment-json-file J |
|
...
In all other respects, the public API of ReassignPartitionsCommand
will not be changed.
AdminClient: increasePartitionCount()
...
Notes:
- This API is synchronous in the sense that the client can assume that the partition count has been changed (or the request was rejected) once they have obtained the result for the topic from the
AlterPartitionCountsResult
.
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* <p>Increase the partition count of the topics given as the keys of {@code counts}
* according to the corresponding values.</p>
*/
public IncreasePartitionCountsResult increasePartitionCounts(Map<String, PartitionCount> counts,
IncreasePartitionCountsOptions options)
public IncreasePartitionCountsResult increasePartitionCounts(Map<String, PartitionCount> counts) |
Where:
Code Block | ||||
---|---|---|---|---|
| ||||
/** Describes a change in a topic's partition count. */
public class PartitionCount {
private int partitionCount;
private List<List<Integer>> assignments;
private PartitionCount(int partitionCount) { ... }
/**
* Increase the partition count for a topic to the given {@code newCount}.
* The assignment of new replicas to brokers will be decided by the broker
* but may subsequently be moved using {@link #reassignPartitions(Map)}.</p>
*/
public static PartitionCount increasePartitionCount(int newCount) { ... }
/**
* <p>Increase the partition count for a topic to the given {@code newCount}
* assigning the new partitions according to the given {@code newAssignments}.
* The length of {@code newAssignments} should equal {@code newCount - oldCount}, since
* the assignment of existing partitions are not changed.
* Each inner list of {@code newAssignments} should have a length equal to
* the topic's replication factor.
* The first broker id in each inner list is the "preferred replica".</p>
*
* <p>For example, suppose a topic currently has a replication factor of 2, and
* has 3 partitions. The number of partitions can be increased to 4
* (with broker 1 being the preferred replica for the new partition) using a
* {@code PartitionCount} constructed like this:</p>
*
* <pre><code>PartitionCount.increasePartitionCount(4, Arrays.asList(Arrays.asList(1, 2))</code></pre>
*
*/
public static PartitionCount increasePartitionCount(int newCount, List<List<Integer>> newAssignments) { ... }
}
public class IncreasePartitionCountsOptions {
public IncreasePartitionCountsOptions() { ... }
public Integer timeoutMs() { ... }
public IncreasePartitionCountsOptions timeoutMs(Integer timeoutMs) { ... }
public boolean validateOnly() { ... }
/**
* Validate the request only: Do not actually change any partition counts.
*/
public IncreasePartitionCountsOptions validateOnly() { ... }
}
public class IncreasePartitionCountsResult {
// package access constructor
Map<String, KafkaFuture<Void>> values() { ... }
KafkaFuture<Void> all() { ... }
} |
Network Protocol: IncreasePartitionCountsRequest and IncreasePartitionCountsResponse
...
The request can be sent to any broker.
The request will require the ALTER
operation on the Topic
resource.
The request is subject to the CreateTopicPolicy
of the broker as configured by the broker's create.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation.
After validating the request the broker calls AdminUtils.addPartitions()
which ultimately updates the topic partition assignment znode (/brokers/topics/${topic}
).
No Format |
---|
IncreasePartitionCountsRequest => [topic_partition_count] timeout
topic_partition_count => topic partition_count
topic => STRING
partition_count => count [assignment]
count => INT32
assignment => [INT32]
timeout => INT32 |
Where
Field | Description |
---|---|
topic | the name of a topic |
count | the new partition count |
assignment | a list of assigned brokers (one list for each new partition) |
timeout | The maximum time to await a response in ms. |
...
No Format |
---|
IncreasePartitionCountsResponse => throttle_time_ms [topic_partition_count_error]
topic_partition_count_error => topic error_code error_message
topic => STRING
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | the name of a topic in the request |
error_code | an error code for that topic |
error_message | more detailed information about any error for that topic |
Anticipated errors:
...
POLICY_VIOLATION(44) The request violated the configured policy
...
.
AdminClient: reassignPartitions()
...