Table of Contents |
---|
Status
Current state: "Under Discussion" Accepted
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
...
Version of CreateTopics will be bumped up to 65. Request format will not be changed.
...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "apiKey": 19, "type": "response", "name": "CreateTopicsResponse", // Version 1 adds a per-topic error message string. // // Version 2 adds the throttle time. // // Starting in version 3, on quota violation, brokers send out responses before throttling. // // Version 4 makes partitions/replicationFactor optional even when assignments are not present (KIP-464). // // Version 5 is the first flexible version. // // Version 5 6also returns topic configs in the response (KIP-525) "validVersions": "0-65", "flexibleVersions": "5+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Topics", "type": "[]CreatableTopicResult", "versions": "0+", "about": "Results for each topic we tried to create.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "about": "The topic name." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "1+", "nullableVersions": "0+", "ignorable": true, "about": "The error message, or null if there was no error." }, // ************* New fields added by KIP-525 *************// { "name": "TopicConfigErrorCode", "type": "int16", "versions": "65+", "tag": 0, "taggedVersions": "95+", "about": "Optional topic config error returned if configs are not returned in the response." }, { "name": "NumPartitions", "type": "int32", "versions": "65+", "default": "-1", "about": "Number of partitions of the topic." }, { "name": "ReplicationFactor", "type": "int16", "versions": "65+", "about"default": "-1", "about": "Replicator factor of the topic." }, { "name": "Configs", "type": "[]CreatableTopicConfigs", "versions": "5+", "nullableVersions": "65+", "about": "Configuration of the topic.", "fields": [ { "name": "Name", "type": "string", "versions": "65+", "about": "The configuration name." }, { "name": "Value", "type": "string", "versions": "65+", "nullableVersions": "05+", "about": "The configuration value." }, { "name": "ReadOnly", "type": "bool", "versions": "65+", "about": "True if the configuration is read-only." }, { "name": "ConfigSource", "type": "int8", "versions": "65+", "default": "-1", "ignorable": true, "about": "The configuration source." }, { "name": "IsSensitive", "type": "bool", "versions": "65+", "about": "True if this configuration is sensitive." } ]} ]} ] } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.clients.admin; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.errors.ApiException; import java.util.Collection; import java.util.Map; import java.util.stream.Collectors; /** * The result of {@link Admin#createTopics(Collection)}. * * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class CreateTopicsResult { private final static int UNKNOWN = -1; publicprivate staticfinal classMap<String, TopicConfigKafkaFuture<TopicMetadataAndConfig>> {futures; CreateTopicsResult(Map<String, KafkaFuture<TopicMetadataAndConfig>> futures) { private final ApiException exception; this.futures = futures; private final int numPartitions; } /** private* finalReturn inta replicationFactor; map from topic names to futures, which can privatebe finalused Config config; to check the status of individual TopicConfig(int numPartitions,* int replicationFactor, Config config) {topic creations. */ public Map<String, this.exception = null;KafkaFuture<Void>> values() { this.numPartitions = numPartitions;return futures.entrySet().stream() this.replicationFactor = replicationFactor; this.config = config; .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> (Void) null))); } /**TOPIC * Return a future which succeeds if all the topic creations succeed. */ public KafkaFuture<Void> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); } /** * Returns a future that provides topic configs for the topic when the request completes. * <p> * If broker version doesn't support replication factor in the response, throw * {@link org.apache.kafka.common.errors.UnsupportedVersionException}. * If broker returned an error for topic configs, throw appropriate exception. For example, * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not * have permission to describe topic configs. */ public KafkaFuture<Config> config(String topic) { return futures.get(topic).thenApply(TopicMetadataAndConfig::config); } /** * Returns a future that provides number of partitions in the topic when the request completes. * <p> * If broker version doesn't support replication factor in the response, throw * {@link org.apache.kafka.common.errors.UnsupportedVersionException}. * If broker returned an error for topic configs, throw appropriate exception. For example, * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not * have permission to describe topic configs. */ public KafkaFuture<Integer> numPartitions(String topic) { return futures.get(topic).thenApply(TopicMetadataAndConfig::numPartitions); } /** * TopicConfig(ApiException exception) { Returns a future that provides replication factor for the topic when the request completes. this.exception =* exception;<p> * If broker version doesn't support replication this.numPartitionsfactor =in UNKNOWN; the response, throw * this.replicationFactor = UNKNOWN; this.config = null;{@link org.apache.kafka.common.errors.UnsupportedVersionException}. * If broker returned an error for topic configs, throw appropriate exception. For example, * } public int numPartitions() { {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not * have returnpermission numPartitions; to describe topic configs. } */ public intKafkaFuture<Integer> replicationFactor(String topic) { return futures.get(topic).thenApply(TopicMetadataAndConfig::replicationFactor; } ); } publicstatic Configclass config()TopicMetadataAndConfig { private final ApiException exception; return config; private final int }numPartitions; } private final Map<String, KafkaFuture<TopicConfig>> futures; int replicationFactor; CreateTopicsResult(Map<String, KafkaFuture<TopicConfig>> futures) { private final this.futures = futures; Config config; } TopicMetadataAndConfig(int /** * Return a map from topic names to futures, which can be used to check the status of individual numPartitions, int replicationFactor, Config config) { this.exception = null; * topic creationsthis. numPartitions = numPartitions; */ public Map<String, KafkaFuture<Void>> values() { this.replicationFactor = replicationFactor; return futures.entrySet().stream() this.config = config; .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> (Void) null))); } TopicMetadataAndConfig(ApiException exception) { } /** this.exception = *exception; Return a future which succeeds if all the topic creations succeed. this.numPartitions = UNKNOWN; */ public KafkaFuture<Void> all() {this.replicationFactor = UNKNOWN; return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); this.config = } null; /** } * Returns a future that provides topic descriptionpublic forint the topic when the request completes. numPartitions() { * ensureSuccess(); * If broker version doesn't support topic configs in the response,return thrownumPartitions; * {@link org.apache.kafka.common.errors.UnsupportedVersionException}. } * If broker returned an error for topic configs, throw corresponding exception. For example, public int replicationFactor() { ensureSuccess(); * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user doesreturn notreplicationFactor; * have permission to describe topic configs. } */ public KafkaFuture<TopicConfig>Config topicConfigconfig(String topic) { return futures.getensureSuccess(topic).thenApply(config -> {); ifreturn (config.exception; != null) } private void throw config.exception; ensureSuccess() { elseif (exception != null) returnthrow configexception; }); } } |
Security
Topic configs will be returned only if user has DescribeConfigs
permissions for the topic. If user doesn't have permissions to describe configs, an error code is returned by the broker. AdminClient method to get topic config will throw TopicAuthorizationException
in this case,
...