Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Expand on Configurable and AutoCloseable.

...

A user can define a policy manager similar to the pluggable Authorizer by setting create.topics.policy.class.name in server.properties and implementing the the CreateTopicPolicy interface. The interface will live in the clients jar under the org.apache.kafka.server.policy package. It implements Configurable so that implementations can act on broker configurations and AutoCloseable so that resources can be released on shutdown.

Code Block
languagejava
package org.apache.kafka.server.policy;
 
import org.apache.kafka.common.Configurable;
public interface CreateTopicPolicy extends Configurable, AutoCloseable {
  void validate(TopicDetails topicDetails) throws InvalidRequestException;
}

public class TopicDetails {
    private final String topic;
    private final int numPartitions;
    private final short replicationFactor;
    private final Map<Integer, List<Integer>> replicasAssignments;
    private final Map<String, String> configs;

    /** replicasAssignment is a map from partition id to broker ids */
    public TopicDetails(String topic, int numPartitions, short replicationFactor, Map<Integer, List<Integer>> replicasAssignments, Map<String, String> configs) {
        this.topic = topic;
        this.numPartitions = numPartitions;
        this.replicationFactor = replicationFactor;
        this.replicasAssignments = replicasAssignments;
        this.configs = configs;
	}
}

...

Code Block
languagetext
CreateTopics Request (Version: 1) => [create_topic_requests] timeout validateOnly (new)
  create_topic_requests => topic num_partitions replication_factor [replica_assignment] [configs] 
    topic => STRING
    num_partitions => INT32
    replication_factor => INT16
    replica_assignment => partition_id [replicas] 
      partition_id => INT32
      replicas => INT32
    configs => config_key config_value 
      config_key => STRING
      config_value => STRING
  timeout => INT32
  validateOnly => BOOLEAN (new)
 
CreateTopics Response (Version: 1) => [topic_errors] 
  topic_errors => topic error_code error_message (new)
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING (new)

Proposed Changes

During broker start-up, AdminManager will instantiate create a CreateTopicPolicy instance if create.topics.policy.class.name is defined. It will then pass the broker configs to the configure method.

When a create topics request is received, it will process each topic will be processed in sequence. For each topic:

...

Note that validation failure only affects the relevant topic, other topics in the request will still be processed. Also, it's worth mentioning that validateOnly doesn't guarantee that topic creation will succeed. Errors could still occur during the actual creation process.

During broker shutdown, CreateTopicsPolicy.close will be invoked.

As described in the previous section, we are proposing one policy config/interface per supported request type. The main advantage is that we can add additional configs in a compatible manner, but it also allows for modular policy implementations. Implementors also have the option of using a single implementation class if they wish, but multiple configs would still have to be set.

...