THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
Table of Contents |
---|
Status
Current state: "Under Discussion" Accepted
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
...
Version of CreateTopics will be bumped up to 65. Request format will not be changed.
...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "apiKey": 19, "type": "response", "name": "CreateTopicsResponse", // Version 1 adds a per-topic error message string. // // Version 2 adds the throttle time. // // Starting in version 3, on quota violation, brokers send out responses before throttling. // // Version 4 makes partitions/replicationFactor optional even when assignments are not present (KIP-464). // // Version 5 is the first flexible version. // Version //5 Versionalso 6 returns topic configs in the response (KIP-525) "validVersions": "0-65", "flexibleVersions": "5+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Topics", "type": "[]CreatableTopicResult", "versions": "0+", "about": "Results for each topic we tried to create.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "about": "The topic name." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "1+", "nullableVersions": "0+", "ignorable": true, "about": "The error message, or null if there was no error." }, // ************* New fields added by KIP-525 *************// { "name": "TopicConfigErrorCode", "type": "int16", "versions": "65+", "tag": 0, "taggedVersions": "95+", "about": "Optional topic config error returned if configs are not returned in the response." }, { "name": "NumPartitions", "type": "int32", "versions": "65+", "default": "-1", "about": "Number of partitions of the topic." }, { "name": "ReplicationFactor", "type": "int16", "versions": "65+", "default": "-1", "about": "Replicator factor of the topic." }, { "name": "Configs", "type": "[]CreatableTopicConfigs", "versions": "65+", "nullableVersions": "65+", "about": "Configuration of the topic.", "fields": [ { "name": "Name", "type": "string", "versions": "65+", "about": "The configuration name." }, { "name": "Value", "type": "string", "versions": "65+", "nullableVersions": "05+", "about": "The configuration value." }, { "name": "ReadOnly", "type": "bool", "versions": "65+", "about": "True if the configuration is read-only." }, { "name": "ConfigSource", "type": "int8", "versions": "65+", "default": "-1", "ignorable": true, "about": "The configuration source." }, { "name": "IsSensitive", "type": "bool", "versions": "65+", "about": "True if this configuration is sensitive." } ]} ]} ] } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.clients.admin; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.errors.ApiException; import java.util.Collection; import java.util.Map; import java.util.stream.Collectors; /** * The result of {@link Admin#createTopics(Collection)}. * * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class CreateTopicsResult { private final static int UNKNOWN = -1; private final Map<String, KafkaFuture<CreateResult>>KafkaFuture<TopicMetadataAndConfig>> futures; CreateTopicsResult(Map<String, KafkaFuture<CreateResult>>KafkaFuture<TopicMetadataAndConfig>> futures) { this.futures = futures; } /** * Return a map from topic names to futures, which can be used to check the status of individual * topic creations. */ public Map<String, KafkaFuture<Void>> values() { return futures.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> (Void) null))); } /**TOPIC * Return a future which succeeds if all the topic creations succeed. */ public KafkaFuture<Void> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); } /** * Returns a future that provides topic configs for the topic when the request completes. * <p> * If broker version doesn't support replication factor in the response, throw * {@link org.apache.kafka.common.errors.UnsupportedVersionException}. * If broker returned an error for topic configs, throw appropriate exception. For example, * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not * have permission to describe topic configs. */ public KafkaFuture<Config> config(String topic) { return futures.get(topic).thenApply(CreateResultTopicMetadataAndConfig::config); } /** * Returns a future that provides number of partitions in the topic when the request completes. * <p> * If broker version doesn't support replication factor in the response, throw * {@link org.apache.kafka.common.errors.UnsupportedVersionException}. * If broker returned an error for topic configs, throw appropriate exception. For example, * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not * have permission to describe topic configs. */ public KafkaFuture<Integer> numPartitions(String topic) { return futures.get(topic).thenApply(CreateResultTopicMetadataAndConfig::numPartitions); } /** * Returns a future that provides replication factor for the topic when the request completes. * <p> * If broker version doesn't support replication factor in the response, throw * {@link org.apache.kafka.common.errors.UnsupportedVersionException}. * If broker returned an error for topic configs, throw appropriate exception. For example, * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not * have permission to describe topic configs. */ public KafkaFuture<Integer> replicationFactor(String topic) { return futures.get(topic).thenApply(CreateResultTopicMetadataAndConfig::replicationFactor); } static class CreateResultTopicMetadataAndConfig { private final ApiException exception; private final int numPartitions; private final int replicationFactor; private final Config config; CreateResultTopicMetadataAndConfig(int numPartitions, int replicationFactor, Config config) { this.exception = null; this.numPartitions = numPartitions; this.replicationFactor = replicationFactor; this.config = config; } CreateResultTopicMetadataAndConfig(ApiException exception) { this.exception = exception; this.numPartitions = UNKNOWN; this.replicationFactor = UNKNOWN; this.config = null; } public int numPartitions() { ensureSuccess(); return numPartitions; } public int replicationFactor() { ensureSuccess(); return replicationFactor; } public Config config() { ensureSuccess(); return config; } private void ensureSuccess() { if (exception != null) throw exception; } } } |
...