Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under Discussion" Accepted

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8907

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Version of CreateTopics will be bumped up to 5. Request format will not change.

CreateTopicsResponse

CreateTopics Response (Version: 4) => throttle_time_ms [topics] 
  throttle_time_ms => INT32
  topics => name error_code error_message num_partitions replication_factor [config_entries]
name => STRING
error_code => INT16
error_message => NULLABLE_STRING
topic_config_error => INT16 (Tagged) <==== NEW All fields from this onwards
num_partitions => INT32 replication_factor => INT16
config_entries => config_name config_value read_only config_source is_sensitive config_name => STRING config_value => NULLABLE_STRING read_only => BOOLEAN config_source => INT8 is_sensitive => BOOLEAN

Admin Client API changes

A new method will be added to CreateTopicsResult to obtain configs. All other methods in CreateTopicsResult will be retained with the same signature.

  • public KafkaFuture<TopicConfig> topicConfig(String topic)

The updated class is shown below:

be changed.

CreateTopicsResponse

Number of partitions, replication factor and topic configs will be included in the response if user has DescribeConfigs permission. If user does not have this permission, an error code is returned in the optional tagged field `TopicConfigErrorCode`.

Code Block
languagejava
titleCreateTopicsResponse version 6
{
  "apiKey": 19,
  "type": "response",
  "name": "CreateTopicsResponse",
  // Version 1 adds a per-topic error message string.
  //
  // Version 2 adds the throttle time.
  //
  // Starting in version 3, on quota violation, brokers send out responses before throttling.
  //
  // Version 4 makes partitions/replicationFactor optional even when assignments are not present (KIP-464).
  //
  // Version 5 is the first flexible version.
  // Version 5 also returns topic configs in the response (KIP-525)
  "validVersions": "0-5",
  "flexibleVersions": "5+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]CreatableTopicResult", "versions": "0+",
      "about": "Results for each topic we tried to create.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The topic name." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or 0 if there was no error." },
      { "name": "ErrorMessage", "type": "string", "versions": "1+", "nullableVersions": "0+", "ignorable": true,
        "about": "The error message, or null if there was no error." },

      // *************  New fields added by KIP-525 *************//
      { "name": "TopicConfigErrorCode", "type": "int16", "versions": "5+", "tag": 0, "taggedVersions": "5+",
        "about": "Optional topic config error returned if configs are not returned in the response." },
      { "name": "NumPartitions", "type": "int32", "versions": "5+", "default": "-1",
        "about": "Number of partitions of the topic." },
      { "name": "ReplicationFactor", "type": "int16", "versions": "5+", "default": "-1",
        "about": "Replicator factor of the topic." },
      { "name": "Configs", "type": "[]CreatableTopicConfigs", "versions": "5+", "nullableVersions": "5+",
        "about": "Configuration of the topic.", "fields": [
        { "name": "Name", "type": "string", "versions": "5+",
          "about": "The configuration name." },
        { "name": "Value", "type": "string", "versions": "5+", "nullableVersions": "5+",
          "about": "The configuration value." },
        { "name": "ReadOnly", "type": "bool", "versions": "5+",
          "about": "True if the configuration is read-only." },
        { "name": "ConfigSource", "type": "int8", "versions": "5+", "default": "-1", "ignorable": true,
          "about": "The configuration source." },
        { "name": "IsSensitive", "type": "bool", "versions": "5+",
          "about": "True if this configuration is sensitive." }
      ]}
    ]}
  ]
}


Admin Client API changes

A new method will be added to CreateTopicsResult to obtain configs. All other methods in CreateTopicsResult will be retained with the same signature.

  • public KafkaFuture<TopicConfig> topicConfig(String topic)


The updated class is shown below:

Code Block
languagejava
titleCreateTopicsResult.java
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.ApiException;

import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * The result of {@link Admin#createTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreateTopicsResult {

    private final static int UNKNOWN = -1;

    private final Map<String, KafkaFuture<TopicMetadataAndConfig>> futures;

    CreateTopicsResult(Map<String, KafkaFuture<TopicMetadataAndConfig>> futures) {
        this.futures = futures;
    }

    /**
     * Return a map from topic names to futures, which can be used to check the status of individual
     * topic creations.
     */
    public Map<String, KafkaFuture<Void>> values() {
        return futures.entrySet().stream()
                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> (Void) null)));
    }

    /**TOPIC
     * Return a future which succeeds if all the topic creations succeed.
     */
    public KafkaFuture<Void> all() {
        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
    }

    /**
     * Returns a future that provides topic configs for the topic when the request completes.
     * <p>
     * If broker version doesn't support replication factor in the response, throw
     * {@link
Code Block
languagejava
titleCreateTopicsResult.java
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.UnsupportedVersionException}.ApiException;

import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * The result of {@link Admin#createTopics(Collection)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class CreateTopicsResult {

    private final static int UNKNOWN = -1;
   * If broker returned an error for topic configs, throw appropriate exception. For example,
     * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not
     * have permission to describe topic configs.
     */
    public KafkaFuture<Config> static class TopicConfigconfig(String topic) {
        private final ApiException exceptionreturn futures.get(topic).thenApply(TopicMetadataAndConfig::config);
    }

    /**
 private final int numPartitions;
 * Returns a future that provides number privateof finalpartitions intin replicationFactor;
the topic when the request completes.
   private final Config* config;<p>

     * If broker TopicConfig(int numPartitions, int replicationFactor, Config config) {
  version doesn't support replication factor in the response, throw
     * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
   this.exception = null;
* If broker returned an error for topic configs, throw appropriate  thisexception.numPartitions =For numPartitions;example,
     *       this.replicationFactor = replicationFactor;
      {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not
     * this.confighave =permission config;
to describe topic configs.
     }*/

    public KafkaFuture<Integer>   TopicConfignumPartitions(ApiExceptionString exceptiontopic) {
        return futures.get(topic).thenApply(TopicMetadataAndConfig::numPartitions);
    }

    this.exception = exception;/**
     * Returns a future that provides replication this.numPartitionsfactor =for UNKNOWN;
the topic when the request completes.
     * <p>
 this.replicationFactor = UNKNOWN;
  * If broker version  doesn't support replication factor in this.config = null;the response, throw
     *   }
{@link org.apache.kafka.common.errors.UnsupportedVersionException}.
     * If broker publicreturned intan numPartitions() {
            return numPartitions;error for topic configs, throw appropriate exception. For example,
     *   }

        public int replicationFactor() {
      {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not
     * have permission to describe topic configs.
      return replicationFactor;*/
    public KafkaFuture<Integer> replicationFactor(String topic) }
{
        public Config config() {
return futures.get(topic).thenApply(TopicMetadataAndConfig::replicationFactor);
    }

    static class TopicMetadataAndConfig {
  return config;
     private final ApiException }exception;
    }

    private final Map<String, KafkaFuture<TopicConfig>> futures;

 int numPartitions;
        CreateTopicsResult(Map<String, KafkaFuture<TopicConfig>> futures) {
private final int replicationFactor;
        private this.futuresfinal =Config futuresconfig;

      }

  TopicMetadataAndConfig(int  /**
     * Return a map from topic names to futures, which can be used to check the status of individual
numPartitions, int replicationFactor, Config config) {
            this.exception = null;
          * topic creations.
this.numPartitions = numPartitions;
      */
    public Map<String, KafkaFuture<Void>> values() {this.replicationFactor = replicationFactor;
        return futures.entrySet().stream()
     this.config = config;
         .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> (Void) null)));
    }

    /**
     * Return a future which succeeds if all the topic creations succeed.
     */
    public KafkaFuture<Void> all() {}

        TopicMetadataAndConfig(ApiException exception) {
            this.exception = exception;
            this.numPartitions = UNKNOWN;
            this.replicationFactor = UNKNOWN;
        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
    this.config = null;
        }

    /**
    public *int Returns a future that provides topic description for the topic when the request completes.
     * If broker version doesn't support topic configs in the response, thrownumPartitions() {
            ensureSuccess();
            return numPartitions;
     *  {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
}

       * Ifpublic brokerint returned an error for topic configs, throw appropriate exception. For example,
     * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not
     * have permission to describe topic configs.replicationFactor() {
            ensureSuccess();
            return replicationFactor;
        }

     */
    public KafkaFuture<TopicConfig>Config topicConfigconfig(String topic) {
        return futures.get(topic).thenApply(config -> {    ensureSuccess();
            ifreturn (config.exception;
 != null)
      }

        private void throw config.exception;
ensureSuccess() {
             elseif (exception != null)
                returnthrow configexception;
        });
    }
}


Security

Topic configs will be returned only if user has DescribeConfigs permissions for the topic. If user doesn't have permissions to describe configs, an error code is returned by the broker. AdminClient method to get topic config will throw TopicAuthorizationException in this case,

...

We could return the full topic metadata including replica assignment, but since the requrements we have seen so far has only been for topic configs, this KIP proposes to expose only configs through CreateTopicsResponse. This avoid avoids duplicating a lot of fields from MetadataResponse. Also, replica assignment generated by brokers is not meaningful with validateOnly=true since randomized assignment will change when the topic is subsequently created.

...