Status
Current state: "Under Discussion"
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
CreateTopics
response currently only returns success or failure status including any errors. No additional metadata is returned for the topics that are created. Topic configuration for the topics being created may be optionally specified in CreateTopics
request and for configs that are not specified, broker default is used. It will be useful to return the actual configuration of the topic that was created in CreateTopics response to avoid two round trip calls to determine these using Metadata
and DescribeConfigs
requests.
KIP-234: add support for getting topic defaults from AdminClient proposed to modify DescribeConfigs
request to obtain default broker configs used for topic creation to enable users to determine the configs that will be applied before issuing CreateTopicsRequest
. This is useful, for example, to display default configs in management tools used to create topics. This KIP provides an alternative solution that enables users to obtain this information using CreateTopics
with validateOnly=true
. When CreateTopics
is invoked with validateOnly=true
, the response will return configs that would have been used if the topic was actually created.
Goals
- Optionally return topic configs including number of partitions and replication count in CreateTopics response. This avoids separate Metadata and DescribeConfigs requests to obtain this after creating the topic. The data will only be returned if requested to avoid unnecessary overhead if user doesn’t require this information.
- Enable users to obtain default configs that will be applied to new topics using
CreateTopics
withvalidateOnly=true
. This is useful for management tools to display default values. - Manage access to topic configs using existing ACLs.
Public Interfaces
Protocol changes
CreateTopicsRequest
Version of CreateTopics will be bumped up to 5. A boolean field will be added to request additional metadata with the following values:
CreateTopics Request (Version: 3) => [topics] timeout_ms validate_only topics => name num_partitions replication_factor [assignments] [configs] name => STRING num_partitions => INT32 replication_factor => INT16 assignments => partition_index [broker_ids] partition_index => INT32 broker_ids => INT32 configs => name value name => STRING value => NULLABLE_STRING timeout_ms => INT32 validate_only => BOOLEAN
return_configs => INT8 <==== NEW
CreateTopicsResponse
CreateTopics Response (Version: 4) => throttle_time_ms [topics] throttle_time_ms => INT32 topics => name error_code error_message num_partitions replication_factor [config_entries]
name => STRING
error_code => INT16
error_message => NULLABLE_STRING
num_partitions => INT32 <==== NEW All fields from this onwards replication_factor => INT16
config_entries => config_name config_value read_only config_source is_sensitive config_name => STRING config_value => NULLABLE_STRING read_only => BOOLEAN config_source => INT8 is_sensitive => BOOLEAN
Admin Client API changes
A new method will be added to CreateTopicsResult to obtain configs. All other methods in CreateTopicsResult will be retained with the same signature.
public KafkaFuture<TopicConfig> topicConfig(String topic)
The updated class is shown below:
package org.apache.kafka.clients.admin; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.annotation.InterfaceStability; import java.util.Collection; import java.util.Map; import java.util.stream.Collectors; /** * The result of {@link Admin#createTopics(Collection)}. * * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class CreateTopicsResult { public static class TopicConfig { private final int numPartitions; private final int replicationFactor; private final Config config; TopicConfig(int numPartitions, int replicationFactor, Config config) { this.numPartitions = numPartitions; this.replicationFactor = replicationFactor; this.config = config; } public int numPartitions() { return numPartitions; } public int replicationFactor() { return replicationFactor; } public Config config() { return config; } } private final Map<String, KafkaFuture<TopicConfig>> futures; CreateTopicsResult(Map<String, KafkaFuture<TopicConfig>> futures) { this.futures = futures; } /** * Return a map from topic names to futures, which can be used to check the status of individual * topic creations. */ public Map<String, KafkaFuture<Void>> values() { return futures.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> (Void) null))); } /** * Return a future which succeeds if all the topic creations succeed. */ public KafkaFuture<Void> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); } /** * Returns a future that provides topic configs for the topic when the request completes. */ public KafkaFuture<TopicConfig> topicConfig(String topic) { return futures.get(topic); } }
Security
The additional metadata will be returned only if user has DescribeConfigs
permissions for the topic. If user requests configs without this permission, the request will be failed with Authorization exception without creating the topic. The error message will indicate why the the request failed.
Proposed Changes
KafkaApis.handleCreateTopics
will be updated to check DescribeConfigs
permissions if configs are requested. Configs will be populated in the response if permitted. Request is failed if configs before attempting to create topic if configs are requested without DescribeConfigs
permissions.
AdminClient
will be updated to return configs if available.
Compatibility, Deprecation, and Migration Plan
Default behaviour for CreateTopics
request will not change. Additional metadata is returned in the response only if clients explicitly request this data. The new config fields will not be populated when new clients talk to older brokers or when old clients talk to newer brokers.
AdminClient will return additional configs in CreateTopicsResult
using new methods, leaving existing methods as-is. Applications can use new options to request additional metadata in the response and use the new topicDescription()
or topicConfig()
methods to retrieve these from the results Existing applications can continue to use the existing methods to process success/failure of the requests without any change.
Since configs are authorized using DescribeConfigs
ACLs, no changes are required in authorizers.
Rejected Alternatives
KIP-234: Return default configs for topics in DescribeConfigsResponse if topic name is null
This KIP proposes to return configs for CreateTopics
requests regardless of validateOnly
flag since metadata is useful for created topics as well as before creating topics. KIP-234 was proposing to just return default topic configs. Also, since some configs like replication factor are not returned by DescribeConfigs
, but are useful for the scenario in KIP-234, it seems better to return TopicDescription
as well as Config
. Another reason for the different approach in this KIP is security. Even though KIP-234 proposes to return default configs for users about to create topics, ACLs for these need to be based on DescribeConfigs
ACL for brokers since no topic is specified. Otherwise we will need to change authorizers to authorizer non-literal topics to see if user can create ANY topic. This KIP proposes a different approach that is consistent within the current authorization model and satisfies the requirement in KIP-234.
Return full topic metadata in CreateTopicsResponse
We could return the full topic metadata including replica assignment, but since the requrements we have seen so far has only been for topic configs, this KIP proposes to expose only configs through CreateTopicsResponse
. This avoid duplicating a lot of fields from MetadataResponse
.