Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public NewTopic(String name); // uses default partitions and replication factor from broker config

This (NewTopic) is what will be passed in AdminClient#createTopics. The default values will remain the same (sentinel value of -1).

Proposed Changes

The change is straightforward, AdminManager on the broker will modify its verification logic to the following:

  • (no change) If partitions and replicas are both present (but manual assignment is not), we use the partitions and replicas and determine assignment automatically
  • (no change) If partitions and replicas are both absent and a manual assignment is present, then we use the manual assignment
  • (no change) If either partitions or replicas are present and a manual assignment is present, the request fails
  • (new) if only partitions is present, we will no longer fail and instead use the supplied partitions and the default replication factor
  • (new) if only replicas are present, we will no longer fail and instead use the supplied replication factor and the default partitions
  • (new) if neither partitions, replicas or manual assignment are present, we will no longer fail and instead use the default replication factor and default partitions

The same error codes will be used to convey error: 

Code Block
    INVALID_PARTITIONS(37, "Number of partitions is below 1.",
            InvalidPartitionsException::new),
    INVALID_REPLICATION_FACTOR(38, "Replication factor is below 1 or larger than the number of available brokers.",
            InvalidReplicationFactorException::new),
    INVALID_REPLICA_ASSIGNMENT(39, "Replica assignment is invalid.",

Compatibility, Deprecation, and Migration Plan

  • No request that was previously valid will become invalid, and no request that was previously valid will change behavior.
  • Some previously invalid requests (e.g. valid partitions and no replication factor or manual assignments) will now succeed.

Rejected Alternatives

  • Add a new configuration to differentiate default.replication.factor for auto-topic-create and for normal behavior (i.e. feature flagging this change). I believe this will just confuse users, and the current configuration name is descriptive enough.
  • Change the broker protocol to accept a new type of request that does not have any value (instead of a sentinel value) for the replication factor/partitions.
    • The original proposal in this KIP will be easier to perform upgrades - since all Kafka brokers will be able to deserialize the incoming request, the only difference is whether or not the request succeeds (i.e. creates the topic) or fails (i.e. sends an error message). 

Below Options may be implemented in a follow-up KIP

Option 2: Partial Port of NewTopicBuilder

We can add the following builder to org.apache.kafka.clients.admin and have the existing one in the connect module extend from it to maintain the existing methods:

Code Block
public static Builder build(String name) { return new Builder(name)); }

  public static class Builder {

     public Builder(String name); // this will be called from NewTopic#build(String)

    /**
     * Specify the desired number of partitions for the topic.
     *
     * @param numPartitions the desired number of partitions; must be positive
     * @return this builder to allow methods to be chained; never null
     */
     public Builder partitions(int numPartitions);

     /**
      * Specify the desired replication factor for the topic.
      *
      * @param replicationFactor the desired replication factor; must be positive
      * @return this builder to allow methods to be chained; never null
      */
     public Builder replicationFactor(short replicationFactor); 

      /**
       * Specify the configuration properties for the topic, overwriting any previously-set properties.
       *
       * @param configs the desired topic configuration properties, or null if all existing properties should be cleared
       * @return this builder to allow methods to be chained; never null
       */
     public Builder config(Map<String, Object> configs);
  
      /**
       * Build the {@link NewTopic} representation.
       *
       * @return the topic description; never null
       */
     public NewTopic build(); 
  }
}


Option 3: Complete Port of NewTopicBuilder

We will make the NewTopicBuilder a first class API for constructing topics with any permutation of defaults and specific configurations. This moves the following API from the org.apache.kafka.connect.runtime package to the org.apache.kafka.clients.admin module as an api within NewTopic, leaving the old one in place but deprecating it for compatibility (the usage within kafka connect will be migrated to use the one in the new package):

Code Block
package org.apache.kafka.clients.admin;

...

public class NewTopic {

  ...

public static Builder build(String name) { return new Builder(name)); }

  public static class Builder {

     public Builder(String name); // this will be called from NewTopic#build(String)

    /**
     * Specify the desired number of partitions for the topic.
     *
     * @param numPartitions the desired number of partitions; must be positive
     * @return this builder to allow methods to be chained; never null
     */
     public Builder partitions(int numPartitions);

     /**
      * Specify the desired replication factor for the topic.
      *
      * @param replicationFactor the desired replication factor; must be positive
      * @return this builder to allow methods to be chained; never null
      */
     public Builder replicationFactor(short replicationFactor); 

      /**
       * Specify that the topic should be compacted.
       *
       * @return this builder to allow methods to be chained; never null
       */
      public Builder compacted();

      /**
       * Specify the minimum number of in-sync replicas required for this topic.
       *
       * @param minInSyncReplicas the minimum number of in-sync replicas allowed for the topic; must be positive
       * @return this builder to allow methods to be chained; never null
       */
      public Builder minInSyncReplicas(short minInSyncReplicas);

      /**
       * Specify whether the broker is allowed to elect a leader that was not an in-sync replica when no ISRs
       * are available.
       *
       * @param allow true if unclean leaders can be elected, or false if they are not allowed
       * @return this builder to allow methods to be chained; never null
       */
      public Builder uncleanLeaderElection(boolean allow);  

      /**
       * Specify the configuration properties for the topic, overwriting any previously-set properties.
       *
       * @param configs the desired topic configuration properties, or null if all existing properties should be cleared
       * @return this builder to allow methods to be chained; never null
       */
     public Builder config(Map<String, Object> configs);
  
      /**
       * Build the {@link NewTopic} representation.
       *
       * @return the topic description; never null
       */
     public NewTopic build(); 
  }
}

This (NewTopic) is what will be passed in AdminClient#createTopics to specify the replication factors. The default values will remain the same (sentinel value of -1).

Proposed Changes

The change is straightforward, AdminManager on the broker will modify its verification logic to the following:

  • (no change) If partitions and replicas are both present (but manual assignment is not), we use the partitions and replicas and determine assignment automatically
  • (no change) If partitions and replicas are both absent and a manual assignment is present, then we use the manual assignment
  • (no change) If either partitions or replicas are present and a manual assignment is present, the request fails
  • (new) if only partitions is present, we will no longer fail and instead use the supplied partitions and the default replication factor
  • (new) if only replicas are present, we will no longer fail and instead use the supplied replication factor and the default partitions
  • (new) if none of partitions, replicas or manual assignment are present, we will no longer fail and instead use the default replication factor and default partitions

The same error codes will be used to convey error: 

Code Block
    INVALID_PARTITIONS(37, "Number of partitions is below 1.",
            InvalidPartitionsException::new),
    INVALID_REPLICATION_FACTOR(38, "Replication factor is below 1 or larger than the number of available brokers.",
            InvalidReplicationFactorException::new),
    INVALID_REPLICA_ASSIGNMENT(39, "Replica assignment is invalid.",

Compatibility, Deprecation, and Migration Plan

  • No request that was previously valid will become invalid, and no request that was previously valid will change behavior.
  • Some previously invalid requests (e.g. valid partitions and no replication factor or manual assignments) will now succeed.

Rejected Alternatives

...