Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleCreateTopicsResponse version 6
{
  "apiKey": 19,
  "type": "response",
  "name": "CreateTopicsResponse",
  // Version 1 adds a per-topic error message string.
  //
  // Version 2 adds the throttle time.
  //
  // Starting in version 3, on quota violation, brokers send out responses before throttling.
  //
  // Version 4 makes partitions/replicationFactor optional even when assignments are not present (KIP-464).
  //
  // Version 5 is the first flexible version.
  //
 Version //5 Versionalso 6 returns topic configs in the response (KIP-525)
  "validVersions": "0-65",
  "flexibleVersions": "5+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]CreatableTopicResult", "versions": "0+",
      "about": "Results for each topic we tried to create.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The topic name." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or 0 if there was no error." },
      { "name": "ErrorMessage", "type": "string", "versions": "1+", "nullableVersions": "0+", "ignorable": true,
        "about": "The error message, or null if there was no error." },

      // *************  New fields added by KIP-525 *************//
      { "name": "TopicConfigErrorCode", "type": "int16", "versions": "65+", "tag": 0, "taggedVersions": "9+",
        "about": "Optional topic config error returned if configs are not returned in the response." },
      { "name": "NumPartitions", "type": "int32", "versions": "65+", "default": "-1",
        "about": "Number of partitions of the topic." },
      { "name": "ReplicationFactor", "type": "int16", "versions": "65+", "default": "-1",
        "about": "Replicator factor of the topic." },
      { "name": "Configs", "type": "[]CreatableTopicConfigs", "versions": "65+", "nullableVersions": "65+",
        "about": "Configuration of the topic.", "fields": [
        { "name": "Name", "type": "string", "versions": "65+",
          "about": "The configuration name." },
        { "name": "Value", "type": "string", "versions": "65+", "nullableVersions": "05+",
          "about": "The configuration value." },
        { "name": "ReadOnly", "type": "bool", "versions": "65+",
          "about": "True if the configuration is read-only." },
        { "name": "ConfigSource", "type": "int8", "versions": "65+", "default": "-1", "ignorable": true,
          "about": "The configuration source." },
        { "name": "IsSensitive", "type": "bool", "versions": "65+",
          "about": "True if this configuration is sensitive." }
      ]}
    ]}
  ]
}

...

Code Block
languagejava
titleCreateTopicsResult.java
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.ApiException;

import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * The result of {@link Admin#createTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreateTopicsResult {

    private final static int UNKNOWN = -1;

    private final Map<String, KafkaFuture<CreateResult>>KafkaFuture<TopicMetadataAndConfig>> futures;

    CreateTopicsResult(Map<String, KafkaFuture<CreateResult>>KafkaFuture<TopicMetadataAndConfig>> futures) {
        this.futures = futures;
    }

    /**
     * Return a map from topic names to futures, which can be used to check the status of individual
     * topic creations.
     */
    public Map<String, KafkaFuture<Void>> values() {
        return futures.entrySet().stream()
                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> (Void) null)));
    }

    /**TOPIC
     * Return a future which succeeds if all the topic creations succeed.
     */
    public KafkaFuture<Void> all() {
        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
    }

    /**
     * Returns a future that provides topic configs for the topic when the request completes.
     * <p>
     * If broker version doesn't support replication factor in the response, throw
     * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
     * If broker returned an error for topic configs, throw appropriate exception. For example,
     * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not
     * have permission to describe topic configs.
     */
    public KafkaFuture<Config> config(String topic) {
        return futures.get(topic).thenApply(CreateResultTopicMetadataAndConfig::config);
    }

    /**
     * Returns a future that provides number of partitions in the topic when the request completes.
     * <p>
     * If broker version doesn't support replication factor in the response, throw
     * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
     * If broker returned an error for topic configs, throw appropriate exception. For example,
     * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not
     * have permission to describe topic configs.
     */
    public KafkaFuture<Integer> numPartitions(String topic) {
        return futures.get(topic).thenApply(CreateResultTopicMetadataAndConfig::numPartitions);
    }

    /**
     * Returns a future that provides replication factor for the topic when the request completes.
     * <p>
     * If broker version doesn't support replication factor in the response, throw
     * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
     * If broker returned an error for topic configs, throw appropriate exception. For example,
     * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not
     * have permission to describe topic configs.
     */
    public KafkaFuture<Integer> replicationFactor(String topic) {
        return futures.get(topic).thenApply(CreateResultTopicMetadataAndConfig::replicationFactor);
    }

    static class CreateResultTopicMetadataAndConfig {
        private final ApiException exception;
        private final int numPartitions;
        private final int replicationFactor;
        private final Config config;

        CreateResultTopicMetadataAndConfig(int numPartitions, int replicationFactor, Config config) {
            this.exception = null;
            this.numPartitions = numPartitions;
            this.replicationFactor = replicationFactor;
            this.config = config;
        }

        CreateResultTopicMetadataAndConfig(ApiException exception) {
            this.exception = exception;
            this.numPartitions = UNKNOWN;
            this.replicationFactor = UNKNOWN;
            this.config = null;
        }

        public int numPartitions() {
            ensureSuccess();
            return numPartitions;
        }

        public int replicationFactor() {
            ensureSuccess();
            return replicationFactor;
        }

        public Config config() {
            ensureSuccess();
            return config;
        }

        private void ensureSuccess() {
            if (exception != null)
                throw exception;
        }
    }
}

...