Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Fix naming convention for validate_only.

...

Code Block
languagejava
package org.apache.kafka.server.policy;
 
/**
 * An interface for enforcing a policy on create topics requests.
 *
 * Common use cases are requiring that the replication factor, min.insync.replicas and/or retention settings for a
 * topic are within an allowable range.
 *
 * If <code>create.topic.policy.class.name</code> is defined, Kafka will create an instance of the specified class
 * using the default constructor and will then pass the broker configs to its <code>configure()</code> method. During
 * broker shutdown, the <code>close()</code> method will be invoked so that resources can be released (if necessary).
 */
public interface CreateTopicPolicy extends Configurable, AutoCloseable {

    /**
     * Class containing the create request parameters.
     */
    class RequestMetadata {
        private final String topic;
        private final Integer numPartitions;
        private final Short replicationFactor;
        private final Map<Integer, List<Integer>> replicasAssignments;
        private final Map<String, String> configs;

        /**
         * Create an instance of this class with the provided parameters.
         *
         * This constructor is public to make testing of <code>CreateTopicPolicy</code> implementations easier.
         *
         * @param topic the name of the topic to created.
         * @param numPartitions the number of partitions to create or null if replicasAssignments is set.
         * @param replicationFactor the replication factor for the topic or null if replicaAssignments is set.
         * @param replicasAssignments replica assignments or null if numPartitions and replicationFactor is set. The
         *                            assignment is a map from partition id to replica (broker) ids.
         * @param configs topic configs for the topic to be created, not including broker defaults. Broker configs are
         *                passed via the {@code configure()} method of the policy implementation.
         */
        public RequestMetadata(String topic, Integer numPartitions, Short replicationFactor,
                        Map<Integer, List<Integer>> replicasAssignments, Map<String, String> configs) {
            this.topic = topic;
            this.numPartitions = numPartitions;
            this.replicationFactor = replicationFactor;
            this.replicasAssignments = replicasAssignments == null ? null : Collections.unmodifiableMap(replicasAssignments);
            this.configs = Collections.unmodifiableMap(configs);
        }

        /**
         * Return the name of the topic to create.
         */
        public String topic() {
            return topic;
        }

        /**
         * Return the number of partitions to create or null if replicaAssignments is not null.
         */
        public Integer numPartitions() {
            return numPartitions;
        }

        /**
         * Return the number of replicas to create or null if replicaAssignments is not null.
         */
        public Short replicationFactor() {
            return replicationFactor;
        }

        /**
         * Return a map from partition id to replica (broker) ids or null if numPartitions and replicationFactor are
         * set instead.
         */
        public Map<Integer, List<Integer>> replicasAssignments() {
            return replicasAssignments;
        }

        /**
         * Return topic configs in the request, not including broker defaults. Broker configs are passed via
         * the {@code configure()} method of the policy implementation.
         */
        public Map<String, String> configs() {
            return configs;
        }

        @Override
        public String toString() {
            return "RequestMetadata(topic=" + topic +
                    ", numPartitions=" + numPartitions +
                    ", replicationFactor=" + replicationFactor +
                    ", replicasAssignments=" + replicasAssignments +
                    ", configs=" + configs + ")";
        }
    }

    /**
     * Validate the request parameters and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the create request parameters for the provided topic do not satisfy this policy.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation
     * failure only affects the relevant topic, other topics in the request will still be processed.
     *
     * @param requestMetadata the create request parameters for the provided topic.
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validate(RequestMetadata requestMetadata) throws PolicyViolationException;
}

...

  1. We need to be able to send error messages back to the client, so we introduce an error_message string field in the topic_errors array of  the CreateTopics response.
  2. We need a new error code for PolicyViolationException, so we assign error code 44 to POLICY_VIOLATION.
  3. For tools that allow users to create topics, a validation/dry-run mode where validation errors are reported but no creation is attempted is useful. A precedent for this exists in the Connect REST API. We introduce a validateOnly validate_only boolean field in the CreateTopics request to enable this mode.

...

Code Block
languagetext
CreateTopics Request (Version: 1) => [create_topic_requests] timeout validateOnlyvalidate_only (new)
  create_topic_requests => topic num_partitions replication_factor [replica_assignment] [configs] 
    topic => STRING
    num_partitions => INT32
    replication_factor => INT16
    replica_assignment => partition_id [replicas] 
      partition_id => INT32
      replicas => INT32
    configs => config_key config_value 
      config_key => STRING
      config_value => STRING
  timeout => INT32
  validateOnlyvalidate_only => BOOLEAN (new)
 
CreateTopics Response (Version: 1) => [topic_errors] 
  topic_errors => topic error_code error_message (new)
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING (new)

...