...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.server.policy; import org.apache.kafka.common.Configurable; public interface CreateTopicPolicy extends Configurable, AutoCloseable { void validate(TopicDetails topicDetails)class throwsRequestMetadata InvalidRequestException; } public class TopicDetails { { private final String topic; private final int numPartitions; private final short replicationFactor; private final Map<Integer, List<Integer>> replicasAssignments; private final Map<String, String> configs; public RequestMetadata(String topic, int numPartitions, short replicationFactor, /** replicasAssignment is a map from partition id to broker ids */ public TopicDetails(String topic, int numPartitions, short replicationFactor, Map<Integer, List<Integer>> replicasAssignments, Map<String, String> configs) { ... this.topic = topic; } public String topic() { return topic; } public this.int numPartitions =() { return numPartitions; } this.replicationFactor = replicationFactor; public Map<Integer, List<Integer>> replicasAssignments() { this.replicasAssignments =return replicasAssignments; } this.configs = configs; public Map<String, String> configs() { return configs; } } void validate(RequestMetadata requestMetadata) throws PolicyViolationException; } package org.apache.kafka.common.errors; public class PolicyViolationException extends ApiException { public PolicyViolationException(String message) { super(message); } public PolicyViolationException(String message, Throwable cause) { super(message, cause); } } |
Users will have to ensure that the policy implementation code is in the broker's classpath. Implementations should throw the newly introduced PolicyViolationException
with an appropriate error message if the request does not fulfill the policy requirements. We chose a generic name for the only parameter of the validate
method in case we decide to add parameters that are not strictly related to the topic (e.g. session information) in the future. The constructor of TopicDetails RequestMetadata
is public to make testing convenient for users. Under normal circumstances, it should only be instantiated by Kafka. We chose to create separate API classes instead of reusing request classes to make it easier to evolve the latter.
The fact that topic creation can now fail due to custom policies raises a couple of new requirements at the protocol level:
- We need to be able to send error messages back to the client, so we introduce an
error_message
string field in thetopic_errors
array of theCreateTopics
response. - We need a new error code for
PolicyViolationException
, so we assign error code44
toPOLICY_VIOLATION
. - For tools that allow users to create topics, a validation/dry-run mode where validation errors are reported but no creation is attempted is useful. A precedent for this exists in the Connect REST API. We introduce a
validateOnly
boolean field in theCreateTopics
request to enable this mode.
...