Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under Discussion" Accepted

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8907

...

Version of CreateTopics will be bumped up to 65. Request format will not be changed.

...

Code Block
languagejava
titleCreateTopicsResponse version 6
{
  "apiKey": 19,
  "type": "response",
  "name": "CreateTopicsResponse",
  // Version 1 adds a per-topic error message string.
  //
  // Version 2 adds the throttle time.
  //
  // Starting in version 3, on quota violation, brokers send out responses before throttling.
  //
  // Version 4 makes partitions/replicationFactor optional even when assignments are not present (KIP-464).
  //
  // Version 5 is the first flexible version.
  //
  // Version 5 6also returns topic configs in the response (KIP-525)
  "validVersions": "0-65",
  "flexibleVersions": "5+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]CreatableTopicResult", "versions": "0+",
      "about": "Results for each topic we tried to create.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The topic name." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or 0 if there was no error." },
      { "name": "ErrorMessage", "type": "string", "versions": "1+", "nullableVersions": "0+", "ignorable": true,
        "about": "The error message, or null if there was no error." },

      // *************  New fields added by KIP-525 *************//
      { "name": "TopicConfigErrorCode", "type": "int16", "versions": "65+", "tag": 0, "taggedVersions": "95+",
        "about": "Optional topic config error returned if configs are not returned in the response." },
      { "name": "NumPartitions", "type": "int32", "versions": "65+", "default": "-1",
        "about": "Number of partitions of the topic." },
      { "name": "ReplicationFactor", "type": "int16", "versions": "65+",
        "about"default": "-1",
        "about": "Replicator factor of the topic." },
      { "name": "Configs", "type": "[]CreatableTopicConfigs", "versions": "5+", "nullableVersions": "65+",
        "about": "Configuration of the topic.", "fields": [
        { "name": "Name", "type": "string", "versions": "65+",
          "about": "The configuration name." },
        { "name": "Value", "type": "string", "versions": "65+", "nullableVersions": "05+",
          "about": "The configuration value." },
        { "name": "ReadOnly", "type": "bool", "versions": "65+",
          "about": "True if the configuration is read-only." },
        { "name": "ConfigSource", "type": "int8", "versions": "65+", "default": "-1", "ignorable": true,
          "about": "The configuration source." },
        { "name": "IsSensitive", "type": "bool", "versions": "65+",
          "about": "True if this configuration is sensitive." }
      ]}
    ]}
  ]
}

...

Code Block
languagejava
titleCreateTopicsResult.java
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.ApiException;

import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * The result of {@link Admin#createTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreateTopicsResult {

    private final static int UNKNOWN = -1;

    publicprivate staticfinal classMap<String, TopicConfigKafkaFuture<TopicMetadataAndConfig>> {futures;

    CreateTopicsResult(Map<String, KafkaFuture<TopicMetadataAndConfig>> futures) {
 private final ApiException exception;
    this.futures = futures;
  private final int numPartitions;
}

    /**
     private* finalReturn inta replicationFactor;
map from topic names to futures, which can privatebe finalused Config config;

    to check the status of individual
    TopicConfig(int numPartitions,* int replicationFactor, Config config) {topic creations.
     */
    public Map<String,  this.exception = null;KafkaFuture<Void>> values() {
            this.numPartitions = numPartitions;return futures.entrySet().stream()
            this.replicationFactor = replicationFactor;
            this.config = config;
  .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> (Void) null)));
    }

    /**TOPIC
     * Return a future which succeeds if all the topic creations succeed.
     */
    public KafkaFuture<Void> all() {
        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
    }

    /**
     * Returns a future that provides topic configs for the topic when the request completes.
     * <p>
     * If broker version doesn't support replication factor in the response, throw
     * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
     * If broker returned an error for topic configs, throw appropriate exception. For example,
     * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not
     * have permission to describe topic configs.
     */
    public KafkaFuture<Config> config(String topic) {
        return futures.get(topic).thenApply(TopicMetadataAndConfig::config);
    }

    /**
     * Returns a future that provides number of partitions in the topic when the request completes.
     * <p>
     * If broker version doesn't support replication factor in the response, throw
     * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
     * If broker returned an error for topic configs, throw appropriate exception. For example,
     * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not
     * have permission to describe topic configs.
     */
    public KafkaFuture<Integer> numPartitions(String topic) {
        return futures.get(topic).thenApply(TopicMetadataAndConfig::numPartitions);
    }

    /**
     *  TopicConfig(ApiException exception) {
        Returns a future that provides replication factor for the topic when the request completes.
    this.exception =* exception;<p>
     * If broker version doesn't support replication this.numPartitionsfactor =in UNKNOWN;
the response, throw
     *     this.replicationFactor = UNKNOWN;
            this.config = null;{@link org.apache.kafka.common.errors.UnsupportedVersionException}.
     * If broker returned an error for topic configs, throw appropriate exception. For example,
     *   }

        public int numPartitions() {
     {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not
     * have returnpermission numPartitions;
to describe topic configs.
     }

    */
    public intKafkaFuture<Integer> replicationFactor(String topic) {
            return futures.get(topic).thenApply(TopicMetadataAndConfig::replicationFactor;
        }
);
    }

    publicstatic Configclass config()TopicMetadataAndConfig {
        private final ApiException exception;
  return config;
     private final int }numPartitions;
    }

    private final Map<String, KafkaFuture<TopicConfig>> futures;
int replicationFactor;
    CreateTopicsResult(Map<String, KafkaFuture<TopicConfig>> futures) {
 private final      this.futures = futures;
Config config;

      }

  TopicMetadataAndConfig(int  /**
     * Return a map from topic names to futures, which can be used to check the status of individual
numPartitions, int replicationFactor, Config config) {
            this.exception = null;
          * topic creationsthis.
numPartitions = numPartitions;
   */
    public Map<String, KafkaFuture<Void>> values() {
 this.replicationFactor = replicationFactor;
     return futures.entrySet().stream()
      this.config = config;
         .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> (Void) null)));
}

        TopicMetadataAndConfig(ApiException exception) {
     }

    /**
   this.exception = *exception;
  Return a future which succeeds if all the topic creations succeed.
this.numPartitions = UNKNOWN;
       */
    public KafkaFuture<Void> all() {this.replicationFactor = UNKNOWN;
        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
  this.config = }
null;
    /**
    }

 * Returns a future that provides topic descriptionpublic forint the topic when the request completes.
numPartitions() {
         *
   ensureSuccess();
  * If broker version doesn't support topic configs in the response,return thrownumPartitions;
      * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
}

      * If broker returned an error for topic configs, throw corresponding exception. For example,  public int replicationFactor() {
            ensureSuccess();
     * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user doesreturn notreplicationFactor;
     * have permission to describe topic configs. }

     */
    public KafkaFuture<TopicConfig>Config topicConfigconfig(String topic) {
           return futures.getensureSuccess(topic).thenApply(config -> {);
            ifreturn (config.exception;
 != null)
      }

        private void throw config.exception;
ensureSuccess() {
             elseif (exception != null)
                returnthrow configexception;
        });
    }
}


Security

Topic configs will be returned only if user has DescribeConfigs permissions for the topic. If user doesn't have permissions to describe configs, an error code is returned by the broker. AdminClient method to get topic config will throw TopicAuthorizationException in this case,

...