Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Copy and paste from KIP-179

...

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state:   [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

...

Describe the problems you are trying to solve.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

...

As described in KIP-4 and KIP-117 it is desirable to have network protocols and Java AdminClient APIs for administration of a Kafka cluster. One such administrative action is to increase the number of partitions of a topic. This action that can also be performed using kafka-topics.sh --alter --topic ... --partitions ... This KIP does not propose to change that tool, simply add an equivalent AdminClient API. Note it is not currently possible to decrease the number of partitions using the tool, and likewise this KIP only proposes to add an API for partition count increase.

Doing this is enable future work to refactor the TopicCommand/kafka-topics.sh to function via a connection to a broker rather than interacting directly with ZooKeeper.

Public Interfaces

New network protocol APIs will be added:

The AdminClient API will have new methods added (plus overloads for options):

Proposed Changes

AdminClient: increasePartitionCount()

Anchor
increasePartitionCount
increasePartitionCount
This API supports the use case of increasing the partition count  via kafka-topics.sh --alter --partitions ...

Notes:

  • This API is synchronous in the sense that the client can assume that the partition count has been changed (or the request was rejected) once they have obtained the result for the topic from the AlterPartitionCountsResult.
Code Block
languagejava
linenumberstrue
/**
 * <p>Increase the partition count of the topics given as the keys of {@code counts}
 * according to the corresponding values.</p>
 */
public IncreasePartitionCountsResult increasePartitionCounts(Map<String, PartitionCount> counts,
                    IncreasePartitionCountsOptions options)
public IncreasePartitionCountsResult increasePartitionCounts(Map<String, PartitionCount> counts) 

Where:

Code Block
languagejava
linenumberstrue
/** Describes a change in a topic's partition count. */
public class PartitionCount {
    private int partitionCount;
    private List<List<Integer>> assignments;
    private PartitionCount(int partitionCount) { ... }

    /** 
     * Increase the partition count for a topic to the given {@code newCount}. 
     * The assignment of new replicas to brokers will be decided by the broker
     * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p>
     */
    public static PartitionCount increasePartitionCount(int newCount) { ... }

    /** 
     * <p>Increase the partition count for a topic to the given {@code newCount} 
     * assigning the new partitions according to the given {@code newAssignments}.
     * The length of {@code newAssignments} should equal {@code newCount - oldCount}, since 
     * the assignment of existing partitions are not changed. 
     * Each inner list of {@code newAssignments} should have a length equal to 
     * the topic's replication factor. 
     * The first broker id in each inner list is the "preferred replica".</p>
     *
     * <p>For example, suppose a topic currently has a replication factor of 2, and 
     * has 3 partitions. The number of partitions can be increased to 4 
     * (with broker 1 being the preferred replica for the new partition) using a 
     * {@code PartitionCount} constructed like this:</p>
     *
     * <pre><code>PartitionCount.increasePartitionCount(4, Arrays.asList(Arrays.asList(1, 2))</code></pre>
     *
     */
    public static PartitionCount increasePartitionCount(int newCount, List<List<Integer>> newAssignments) { ... }
}
    

public class IncreasePartitionCountsOptions {
    public IncreasePartitionCountsOptions() { ... }
    public Integer timeoutMs() { ... }
    public IncreasePartitionCountsOptions timeoutMs(Integer timeoutMs) { ... }
    public boolean validateOnly() { ... }
    /**
     * Validate the request only: Do not actually change any partition counts.
     */
    public IncreasePartitionCountsOptions validateOnly() { ... }
}
 
public class IncreasePartitionCountsResult {
    // package access constructor
    Map<String, KafkaFuture<Void>> values() { ... }
    KafkaFuture<Void> all() { ... }
}

Network Protocol: IncreasePartitionCountsRequest and IncreasePartitionCountsResponse

Anchor
IncreasePartitionCountsRequest
IncreasePartitionCountsRequest
The IncreasePartitionCountsRequest is used to increase the partition count for a batch of topics, and is the basis for the  AdminClient.increasePartitionCounts() method.

The request can be sent to any broker.

The request will require the ALTER operation on the Topic resource.

The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.

After validating the request the broker calls AdminUtils.addPartitions() which ultimately updates the topic partition assignment znode (/brokers/topics/${topic}).

No Format
IncreasePartitionCountsRequest => [topic_partition_count] timeout
  topic_partition_count => topic partition_count
    topic => STRING
    partition_count => count [assignment]
      count => INT32
      assignment => [INT32]
  timeout => INT32

Where

FieldDescription
topicthe name of a topic
countthe new partition count
assignment

a list of assigned brokers (one list for each new partition)

timeout

The maximum time to await a response in ms.

Anchor
IncreasePartitionCountsResponse
IncreasePartitionCountsResponse
The response provides an error code and message for each of the topics present in the request.

No Format
IncreasePartitionCountsResponse => throttle_time_ms [topic_partition_count_error]
  topic_partition_count_error => topic error_code error_message
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topicthe name of a topic in the request
error_codean error code for that topic
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • TOPIC_AUTHORIZATION_FAILED (29) The user lacked Alter on the topic
  • POLICY_VIOLATION(44) The request violated the configured policy

  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the partition count was <= the current partition count for the topic.
  • INVALID_REQUEST (42) If duplicate topics appeared in the request, or the size of the partitions list did not equal the number of new partitions, or if the size of any of the lists contained in the partitions list was not equal to the topic replication factor
  • NONE (0) The topic partition count was changed successfully.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

This is a new API and won't directly affect existing users.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

...