Status

Current stateAdopted

Discussion thread: here

JIRA: (2.3) and  (2.4)

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Many simple workloads on Kafka can benefit from default partitioning and replication configurations. This allows system administrators to set some sane defaults, making Kafka more accessible to engineers in an organization that may not have knowledge to set meaningful values for these configurations.

There are two existing Kafka cluster configurations that we well leverage:

Today, these two configurations can only be leveraged from auto.topic.create; this KIP proposes leveraging them from the AdminClient APIs as well.

Public Interfaces

We will add one new constructor to NewTopic that resolves all defaults and requires only the topic name:

public NewTopic(String name); // uses default partitions and replication factor from broker config

This (NewTopic) is what will be passed in AdminClient#createTopics. The default values will remain the same (sentinel value of -1).

The `CreateTopicsRequest` will bump its protocol version to 4 so that any requests sent to old brokers with the default values and no replica assignments will fail with a UnsupportedVersionException.

To exploit this new feature in KafkaStreams, we update the default value of Streams configuration parameter `replication.factor` from `1` to `-1`. Cf KIP-733: change Kafka Streams default replication factor config

Proposed Changes

The change is straightforward, AdminManager on the broker will modify its verification logic to the following:

The same error codes will be used to convey error: 

    INVALID_PARTITIONS(37, "Number of partitions is below 1.",
            InvalidPartitionsException::new),
    INVALID_REPLICATION_FACTOR(38, "Replication factor is below 1 or larger than the number of available brokers.",
            InvalidReplicationFactorException::new),
    INVALID_REPLICA_ASSIGNMENT(39, "Replica assignment is invalid.",

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

Below Options may be implemented in a follow-up KIP

Option 2: Partial Port of NewTopicBuilder

We can add the following builder to org.apache.kafka.clients.admin and have the existing one in the connect module extend from it to maintain the existing methods:

public static Builder build(String name) { return new Builder(name)); }

  public static class Builder {

     public Builder(String name); // this will be called from NewTopic#build(String)

    /**
     * Specify the desired number of partitions for the topic.
     *
     * @param numPartitions the desired number of partitions; must be positive
     * @return this builder to allow methods to be chained; never null
     */
     public Builder partitions(int numPartitions);

     /**
      * Specify the desired replication factor for the topic.
      *
      * @param replicationFactor the desired replication factor; must be positive
      * @return this builder to allow methods to be chained; never null
      */
     public Builder replicationFactor(short replicationFactor); 

      /**
       * Specify the configuration properties for the topic, overwriting any previously-set properties.
       *
       * @param configs the desired topic configuration properties, or null if all existing properties should be cleared
       * @return this builder to allow methods to be chained; never null
       */
     public Builder config(Map<String, Object> configs);
  
      /**
       * Build the {@link NewTopic} representation.
       *
       * @return the topic description; never null
       */
     public NewTopic build(); 
  }
}


Option 3: Complete Port of NewTopicBuilder

We will make the NewTopicBuilder a first class API for constructing topics with any permutation of defaults and specific configurations. This moves the following API from the org.apache.kafka.connect.runtime package to the org.apache.kafka.clients.admin module as an api within NewTopic, leaving the old one in place but deprecating it for compatibility (the usage within kafka connect will be migrated to use the one in the new package):

package org.apache.kafka.clients.admin;

...

public class NewTopic {

  ...

public static Builder build(String name) { return new Builder(name)); }

  public static class Builder {

     public Builder(String name); // this will be called from NewTopic#build(String)

    /**
     * Specify the desired number of partitions for the topic.
     *
     * @param numPartitions the desired number of partitions; must be positive
     * @return this builder to allow methods to be chained; never null
     */
     public Builder partitions(int numPartitions);

     /**
      * Specify the desired replication factor for the topic.
      *
      * @param replicationFactor the desired replication factor; must be positive
      * @return this builder to allow methods to be chained; never null
      */
     public Builder replicationFactor(short replicationFactor); 

      /**
       * Specify that the topic should be compacted.
       *
       * @return this builder to allow methods to be chained; never null
       */
      public Builder compacted();

      /**
       * Specify the minimum number of in-sync replicas required for this topic.
       *
       * @param minInSyncReplicas the minimum number of in-sync replicas allowed for the topic; must be positive
       * @return this builder to allow methods to be chained; never null
       */
      public Builder minInSyncReplicas(short minInSyncReplicas);

      /**
       * Specify whether the broker is allowed to elect a leader that was not an in-sync replica when no ISRs
       * are available.
       *
       * @param allow true if unclean leaders can be elected, or false if they are not allowed
       * @return this builder to allow methods to be chained; never null
       */
      public Builder uncleanLeaderElection(boolean allow);  

      /**
       * Specify the configuration properties for the topic, overwriting any previously-set properties.
       *
       * @param configs the desired topic configuration properties, or null if all existing properties should be cleared
       * @return this builder to allow methods to be chained; never null
       */
     public Builder config(Map<String, Object> configs);
  
      /**
       * Build the {@link NewTopic} representation.
       *
       * @return the topic description; never null
       */
     public NewTopic build(); 
  }
}