THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.server.policy; /** * An interface for enforcing a policy on create topics requests. * * Common use cases are requiring that the replication factor, min.insync.replicas and/or retention settings for a * topic are within an allowable range. * * If <code>create.topic.policy.class.name</code> is defined, Kafka will create an instance of the specified class * using the default constructor and will then pass the broker configs to its <code>configure()</code> method. During * broker shutdown, the <code>close()</code> method will be invoked so that resources can be released (if necessary). */ public interface CreateTopicPolicy extends Configurable, AutoCloseable { /** * Class containing the create request parameters. */ class RequestMetadata { private final String topic; private final Integer numPartitions; private final Short replicationFactor; private final Map<Integer, List<Integer>> replicasAssignments; private final Map<String, String> configs; /** * Create an instance of this class with the provided parameters. * * This constructor is public to make testing of <code>CreateTopicPolicy</code> implementations easier. * * @param topic the name of the topic to created. * @param numPartitions the number of partitions to create or null if replicasAssignments is set. * @param replicationFactor the replication factor for the topic or null if replicaAssignments is set. * @param replicasAssignments replica assignments or null if numPartitions and replicationFactor is set. The * assignment is a map from partition id to replica (broker) ids. * @param configs topic configs for the topic to be created, not including broker defaults. Broker configs are * passed via the {@code configure()} method of the policy implementation. */ public RequestMetadata(String topic, Integer numPartitions, Short replicationFactor, Map<Integer, List<Integer>> replicasAssignments, Map<String, String> configs) { this.topic = topic; this.numPartitions = numPartitions; this.replicationFactor = replicationFactor; this.replicasAssignments = replicasAssignments == null ? null : Collections.unmodifiableMap(replicasAssignments); this.configs = Collections.unmodifiableMap(configs); } /** * Return the name of the topic to create. */ public String topic() { return topic; } /** * Return the number of partitions to create or null if replicaAssignments is not null. */ public Integer numPartitions() { return numPartitions; } /** * Return the number of replicas to create or null if replicaAssignments is not null. */ public Short replicationFactor() { return replicationFactor; } /** * Return a map from partition id to replica (broker) ids or null if numPartitions and replicationFactor are * set instead. */ public Map<Integer, List<Integer>> replicasAssignments() { return replicasAssignments; } /** * Return topic configs in the request, not including broker defaults. Broker configs are passed via * the {@code configure()} method of the policy implementation. */ public Map<String, String> configs() { return configs; } @Override public String toString() { return "RequestMetadata(topic=" + topic + ", numPartitions=" + numPartitions + ", replicationFactor=" + replicationFactor + ", replicasAssignments=" + replicasAssignments + ", configs=" + configs + ")"; } } /** * Validate the request parameters and throw a <code>PolicyViolationException</code> with a suitable error * message if the create request parameters for the provided topic do not satisfy this policy. * * Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation * failure only affects the relevant topic, other topics in the request will still be processed. * * @param requestMetadata the create request parameters for the provided topic. * @throws PolicyViolationException if the request parameters do not satisfy this policy. */ void validate(RequestMetadata requestMetadata) throws PolicyViolationException; } |
...
- We need to be able to send error messages back to the client, so we introduce an
error_message
string field in thetopic_errors
array of theCreateTopics
response. - We need a new error code for
PolicyViolationException
, so we assign error code44
toPOLICY_VIOLATION
. - For tools that allow users to create topics, a validation/dry-run mode where validation errors are reported but no creation is attempted is useful. A precedent for this exists in the Connect REST API. We introduce a
validateOnly
validate_only
boolean field in theCreateTopics
request to enable this mode.
...
Code Block | ||
---|---|---|
| ||
CreateTopics Request (Version: 1) => [create_topic_requests] timeout validateOnlyvalidate_only (new) create_topic_requests => topic num_partitions replication_factor [replica_assignment] [configs] topic => STRING num_partitions => INT32 replication_factor => INT16 replica_assignment => partition_id [replicas] partition_id => INT32 replicas => INT32 configs => config_key config_value config_key => STRING config_value => STRING timeout => INT32 validateOnlyvalidate_only => BOOLEAN (new) CreateTopics Response (Version: 1) => [topic_errors] topic_errors => topic error_code error_message (new) topic => STRING error_code => INT16 error_message => NULLABLE_STRING (new) |
...