Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Fix naming convention for validate_only.


A user can define a policy manager similar to the pluggable Authorizer by setting in and implementing the the CreateTopicPolicy interface. The interface will live in the clients jar under the org.apache.kafka.server.policy package. It implements Configurable so that implementations can act on broker configurations and AutoCloseable so that resources can be released on shutdown.

Code Block
package org.apache.kafka.server.policy;
 * An interface for enforcing a policy on create topics requests.
 * Common use cases are requiring that the replication factor, min.insync.replicas and/or retention settings for a
 * topic are within an allowable range.
 * If <code></code> is defined, Kafka will create an instance of the specified class
 * using the default constructor and will then pass the broker configs to its <code>configure()</code> method. During
 * broker shutdown, the <code>close()</code> method will be invoked so that resources can be released (if necessary).
public interface CreateTopicPolicy extends Configurable, AutoCloseable {

     * Class containing the create request parameters.
    class RequestMetadata {
        private final String topic;
        private final intInteger numPartitions;
        private final shortShort replicationFactor;
        private final Map<Integer, List<Integer>> replicasAssignments;
        private final Map<String, String> configs;

         * Create an instance of this class with the provided parameters.
         * This constructor is public RequestMetadata(String topic, int numPartitions, short replicationFactor, to make testing of <code>CreateTopicPolicy</code> implementations easier.
         * @param topic the name of the topic to created.
         * @param numPartitions the number of partitions to create or null if replicasAssignments is set.
         * @param replicationFactor the replication factor for the topic or null if replicaAssignments is set.
         * @param replicasAssignments replica assignments or null if numPartitions and replicationFactor is set. The
         *                            assignment is a map from partition id to replica (broker) ids.
         * @param configs topic configs for the topic to be created, not including broker defaults. Broker configs are
         *                passed via the {@code configure()} method of the policy implementation.
        public RequestMetadata(String topic, Integer numPartitions, Short replicationFactor,
                        Map<Integer, List<Integer>> replicasAssignments, Map<String, String> configs) {
            this.topic = topic;
            this.numPartitions = numPartitions;
            this.replicationFactor = replicationFactor;
            this.replicasAssignments = replicasAssignments == null ? null : Collections.unmodifiableMap(replicasAssignments);
            this.configs = Collections.unmodifiableMap(configs);

         * Return the name of the topic to create.
        public String topic() {
            return topic;

         * Return the number of partitions to create or null if replicaAssignments is not null.
        public intInteger numPartitions() {
            return numPartitions;

         * Return the number of replicas to create or null if replicaAssignments is not null.
        public Short replicationFactor() {
            return replicationFactor;

         * Return a map from partition id to replica (broker) ids or null if numPartitions and replicationFactor are
         * set instead.
        public Map<Integer, List<Integer>> replicasAssignments() {
            return replicasAssignments;

         * Return topic configs in the request, not including broker defaults. Broker configs are passed via
         * the {@code configure()} method of the policy implementation.
        public Map<String, String> configs() {
            return configs;


    void validate(RequestMetadata requestMetadata) throws PolicyViolationException;
package org.apache.kafka.common.errors;

public class PolicyViolationException extends ApiException {

    public PolicyViolationException(String message) {

    public PolicyViolationException(String message, Throwable cause) {
        super(message, cause);
    } * Validate the request parameters and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the create request parameters for the provided topic do not satisfy this policy.
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation
     * failure only affects the relevant topic, other topics in the request will still be processed.
     * @param requestMetadata the create request parameters for the provided topic.
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
    void validate(RequestMetadata requestMetadata) throws PolicyViolationException;

Users will have to ensure that the policy implementation code is in the broker's classpath. Implementations should throw the newly introduced PolicyViolationException with an appropriate error message if the request does not fulfill the policy requirements. We chose a generic name for the only parameter of the validate method in case we decide to add parameters that are not strictly related to the topic (e.g. session information) in the future. The constructor of RequestMetadata is public to make testing convenient for users. Under normal circumstances, it should only be instantiated by Kafka. We chose to create separate API classes instead of reusing request classes to make it easier to evolve the latter.


  1. We need to be able to send error messages back to the client, so we introduce an error_message string field in the topic_errors array of  the CreateTopics response.
  2. We need a new error code for PolicyViolationException, so we assign error code 44 to POLICY_VIOLATION.
  3. For tools that allow users to create topics, a validation/dry-run mode where validation errors are reported but no creation is attempted is useful. A precedent for this exists in the Connect REST API. We introduce a validateOnly validate_only boolean field in the CreateTopics request to enable this mode.


Code Block
CreateTopics Request (Version: 1) => [create_topic_requests] timeout validateOnlyvalidate_only (new)
  create_topic_requests => topic num_partitions replication_factor [replica_assignment] [configs] 
    topic => STRING
    num_partitions => INT32
    replication_factor => INT16
    replica_assignment => partition_id [replicas] 
      partition_id => INT32
      replicas => INT32
    configs => config_key config_value 
      config_key => STRING
      config_value => STRING
  timeout => INT32
  validateOnlyvalidate_only => BOOLEAN (new)
CreateTopics Response (Version: 1) => [topic_errors] 
  topic_errors => topic error_code error_message (new)
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING (new)


During broker start-up, AdminManager will create a CreateTopicPolicy instance if is defined. It will then pass the broker configs to the configure method.
