Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Describe the problems you are trying to solve.

...

Doing this is enable future work to refactor the TopicCommand/kafka-topics.sh to function via a connection to a broker rather than interacting directly with ZooKeeper.

Public Interfaces

New network protocol APIs will be added: 

The AdminClient API will have new methods added (plus overloads for options):

Proposed Changes

AdminClient: createPartitions()

Anchor
createPartitions
createPartitions
This API supports the use case of increasing the partition count  via kafka-topics.sh --alter --partitions ...

...

Code Block
languagejava
linenumberstrue
/** Describes new partitions for a particular topic. */
public class NewPartitions {
    private int newNumPartitions;
    private List<List<Integer>> assignments;
    private NewPartitions(int newNumPartitions) { ... }

    /** 
     * Increase the number of partitions to the given {@code newCount}. 
     * The assignment of new replicas to brokers will be decided by the broker.</p>
     */
    public static NewPartitions increaseTo(int newCount) { ... }

    /** 
     * <p>Increase the number of partitions to the given {@code newCount} 
     * assigning the new partitions according to the given {@code newAssignments}.
     * The length of {@code newAssignments} should equal {@code newCount - oldCount}, since 
     * the assignment of existing partitions are not changed. 
     * Each inner list of {@code newAssignments} should have a length equal to 
     * the topic's replication factor. 
     * The first broker id in each inner list is the "preferred replica".</p>
     *
     * <p>For example, suppose a topic currently has a replication factor of 2, and 
     * has 3 partitions. The number of partitions can be increased to 4 
     * (with broker 1 being the preferred replica for the new partition) using a 
     * {@code PartitionCount} constructed like this:</p>
     *
     * <pre><code>NewPartitions.increaseTo(4, Arrays.asList(Arrays.asList(1, 2))</code></pre>
     *
     */
    public static NewPartitions increaseTo(int newCount, List<List<Integer>> newAssignments) { ... }
}
    

public class CreatePartitionsOptions {
    public CreatePartitionsOptions() { ... }
    public Integer timeoutMs() { ... }
    public CreatePartitionsOptions timeoutMs(Integer timeoutMs) { ... }
    public boolean validateOnly() { ... }
    /**
     * Validate the request only: Do not actually change any partition counts.
     */
    public CreatePartitionsOptions validateOnly() { ... }
}
 
public class CreatePartitionsResult {
    // package access constructor
    Map<String, KafkaFuture<Void>> values() { ... }
    KafkaFuture<Void> all() { ... }
}

Network Protocol: CreatePartitionsRequest and CreatePartitionsResponse

Anchor
CreatePartitionsRequest
CreatePartitionsRequest
The CreatePartitionsRequest is used to increase the partition count for a batch of topics, and is the basis for the  AdminClient.createPartitions() method.

The request can be sent to any broker.

...

  • TOPIC_AUTHORIZATION_FAILED (29) The user lacked Alter on the topic
  • POLICY_VIOLATION(44) The request violated the configured policy

  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the partition count was <= the current partition count for the topic.
  • INVALID_REQUEST (42) If duplicate topics appeared in the request, or the size of the partitions list did not equal the number of new partitions, or if the size of any of the lists contained in the partitions list was not equal to the topic replication factor
  • NONE (0) The topic partition count was changed successfully.

Compatibility, Deprecation, and Migration Plan

This is a new API and won't directly affect existing users.

Rejected Alternatives

NewPartitions is inconsistent because it takes a number of partitions, but only assignments for the new partitions. One is absolute and the other is a difference. The reasons for this are:

...