Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8907

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Version of CreateTopics will be bumped up to 56. Request format will not be changed.

CreateTopicsResponse

Number of partitions, replication factor and topic configs will not change.

CreateTopicsResponse

...

be included in the response if user has DescribeConfigs permission. If user does not have this permission, an error code is returned in the optional tagged field `TopicConfigErrorCode`.

Code Block
languagejava
titleCreateTopicsResponse version 6
{
  "apiKey": 19,
  "type": "response",
  "name": "CreateTopicsResponse",
  // Version 1 adds a per-topic error message string.
  //
  // Version 2 adds the throttle time.
  //
  // Starting in version 3, on quota violation, brokers send out responses before throttling.
  //
  // Version 4 makes partitions/replicationFactor optional even when assignments are not present (KIP-464).
  //
  // Version 5 is the first flexible version.
  //
  // Version 6 returns topic configs in the response (KIP-525)
  "validVersions": "0-6",
  "flexibleVersions": "5+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]CreatableTopicResult", "versions": "0+",
      "about": "Results for each topic we tried to create.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The topic name." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or 0 if there was no error." },
      { "name": "ErrorMessage", "type": "string", "versions": "1+", "nullableVersions": "0+", "ignorable": true,
        "about": "The error message, or null if there was no error." },

      // *************  New fields added by KIP-525 *************//
      { "name": "TopicConfigErrorCode", "type": "int16", "versions": "6+", "tag": 0, "taggedVersions": "9+",
        "about": "Optional topic config error returned if configs are not returned in the response." },
      { "name": "NumPartitions", "type": "int32", "versions": "6+",
        "about": "Number of partitions of the topic." },
      { "name": "ReplicationFactor", "type": "int16", "versions": "6+",
        "about": "Replicator factor of the topic." },
      { "name": "Configs", "type": "[]CreatableTopicConfigs", "versions": "6+",
        "about": "Configuration of the topic.", "fields": [
        { "name": "Name", "type": "string", "versions": "6+",
          "about": "The configuration name." },
        { "name": "Value", "type": "string", "versions": "6+", "nullableVersions": "0+",
          "about": "The configuration value." },
        { "name": "ReadOnly", "type": "bool", "versions": "6+",
          "about": "True if the configuration is read-only." },
        { "name": "ConfigSource", "type": "int8", "versions": "6+", "default": "-1", "ignorable": true,
          "about": "The configuration source." },
        { "name": "IsSensitive", "type": "bool", "versions": "6+",
          "about": "True if this configuration is sensitive." }
      ]}
    ]}
  ]
}


Admin Client API changes

A new method will be added to CreateTopicsResult to obtain configs. All other methods in CreateTopicsResult will be retained with the same signature.

...

Code Block
languagejava
titleCreateTopicsResult.java
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.ApiException;

import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * The result of {@link Admin#createTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreateTopicsResult {

    private final static int UNKNOWN = -1;

    public static class TopicConfig {
        private final ApiException exception;
        private final int numPartitions;
        private final int replicationFactor;
        private final Config config;

        TopicConfig(int numPartitions, int replicationFactor, Config config) {
            this.exception = null;
            this.numPartitions = numPartitions;
            this.replicationFactor = replicationFactor;
            this.config = config;
        }

        TopicConfig(ApiException exception) {
            this.exception = exception;
            this.numPartitions = UNKNOWN;
            this.replicationFactor = UNKNOWN;
            this.config = null;
        }

        public int numPartitions() {
            return numPartitions;
        }

        public int replicationFactor() {
            return replicationFactor;
        }

        public Config config() {
            return config;
        }
    }

    private final Map<String, KafkaFuture<TopicConfig>> futures;

    CreateTopicsResult(Map<String, KafkaFuture<TopicConfig>> futures) {
        this.futures = futures;
    }

    /**
     * Return a map from topic names to futures, which can be used to check the status of individual
     * topic creations.
     */
    public Map<String, KafkaFuture<Void>> values() {
        return futures.entrySet().stream()
                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> (Void) null)));
    }

    /**
     * Return a future which succeeds if all the topic creations succeed.
     */
    public KafkaFuture<Void> all() {
        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
    }

    /**
     * Returns a future that provides topic description for the topic when the request completes.
     *
     * If broker version doesn't support topic configs in the response, throw
     * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
     * If broker returned an error for topic configs, throw appropriatecorresponding exception. For example,
     * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not
     * have permission to describe topic configs.
     */
    public KafkaFuture<TopicConfig> topicConfig(String topic) {
        return futures.get(topic).thenApply(config -> {
            if (config.exception != null)
                throw config.exception;
            else
                return config;
        });
    }
}

...

We could return the full topic metadata including replica assignment, but since the requrements we have seen so far has only been for topic configs, this KIP proposes to expose only configs through CreateTopicsResponse. This avoid avoids duplicating a lot of fields from MetadataResponse. Also, replica assignment generated by brokers is not meaningful with validateOnly=true since randomized assignment will change when the topic is subsequently created.

...