Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update the KIP to match the current implementation.

...

Code Block
languagejava
package org.apache.kafka.server.policy;
 
import org.apache.kafka.common.Configurable;
public interface CreateTopicPolicy extends Configurable, AutoCloseable {

  void validate(TopicDetails topicDetails)class throwsRequestMetadata InvalidRequestException;
}

public class TopicDetails {
{
        private final String topic;
        private final int numPartitions;
        private final short replicationFactor;
        private final Map<Integer, List<Integer>> replicasAssignments;
        private final Map<String, String> configs;

        public RequestMetadata(String topic, int numPartitions, short replicationFactor,
         /** replicasAssignment is a map from partition id to broker ids */
    public TopicDetails(String topic, int numPartitions, short replicationFactor, Map<Integer, List<Integer>> replicasAssignments, Map<String, String> configs) {
			...
        this.topic = topic;
}

        public String topic() {
            return topic;
        }

        public this.int numPartitions =() {
            return numPartitions;
        }

      this.replicationFactor = replicationFactor;
  public Map<Integer, List<Integer>> replicasAssignments() {
           this.replicasAssignments =return replicasAssignments;
        }

  this.configs = configs;
	      public Map<String, String> configs() {
            return configs;
        }

    }

    void validate(RequestMetadata requestMetadata) throws PolicyViolationException;
}
 
package org.apache.kafka.common.errors;

public class PolicyViolationException extends ApiException {

    public PolicyViolationException(String message) {
        super(message);
    }

    public PolicyViolationException(String message, Throwable cause) {
        super(message, cause);
    }
}

Users will have to ensure that the policy implementation code is in the broker's classpath. Implementations should throw the newly introduced PolicyViolationException with an appropriate error message if the request does not fulfill the policy requirements. We chose a generic name for the only parameter of the validate method in case we decide to add parameters that are not strictly related to the topic (e.g. session information) in the future. The constructor of TopicDetails RequestMetadata is public to make testing convenient for users. Under normal circumstances, it should only be instantiated by Kafka. We chose to create separate API classes instead of reusing request classes to make it easier to evolve the latter.

The fact that topic creation can now fail due to custom policies raises a couple of new requirements at the protocol level:

  1. We need to be able to send error messages back to the client, so we introduce an error_message string field in the topic_errors array of  the CreateTopics response.
  2. We need a new error code for PolicyViolationException, so we assign error code 44 to POLICY_VIOLATION.
  3. For tools that allow users to create topics, a validation/dry-run mode where validation errors are reported but no creation is attempted is useful. A precedent for this exists in the Connect REST API. We introduce a validateOnly boolean field in the CreateTopics request to enable this mode.

...