Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleReplicaAssignor
package org.apache.kafka.server;

public interface ReplicaAssignor {

    /**
     * AssignsComputes replica assignments for the specified partitions to brokers.
     * 
     * If an assignment can't be computed, for example if the state of the cluster does not statifysatisfy a requirement,
     * implementations can throw a ReplicaAssignorException to prevent the topic/partition creation.
     * @param topicNamepartitions The name of the topic partitions being created
     * @param cluster The cluster metadata
     * @param partitionIdsprincipal The list of partition ids that need an assignment
     * @param replicationFactor The replication factor of the topicprincipal of the user initiating the request
     * @return The computed replica assignments
     * @throw ReplicaAssignorException
     */
    public ReplicaAssignment computeAssignment(
            NewPartitions partitions,
            Cluster cluster,
            KafkaPrincipal principal) throws ReplicaAssignorException;

    /**
     * Computed replica assignments for the specified partitions
     * @param configs The topic configurations
/
    public class ReplicaAssignment {

        private final Map<Integer, List<Integer>> assignment;

        public ReplicaAssignment(Map<Integer, List<Integer>> assignment) {
        *  @param cluster Thethis.assignment cluster= metadataassignment;
     *  @param principal}

 The principal of the user initiating the request/**
         * @return Aa mapMap ofwith partitionIdthe to list of assigned brokers
replicas for each partition
         */
       public Map<Integer, List<Integer>> assignReplicasToBrokers(
assignment() {
            return assignment;
        }
    }

    /**
     * Partitions which require an assignment to be computed
     */
    public interface NewPartitions {

        /**
         * The name of the topic for these partitions
         */
        String topicName,
();

        /**
         * The list of partition ids
         */
        List<Integer> partitionIds,();

        /**
         * The  int replicationFactor,
replication factor of the topic
         */
        short replicationFactor();

     Map<String,  String> configs,/**
         * The configuration Cluster cluster,of the topic
         */
   KafkaPrincipal principal) throws ReplicaAssignorException;     Map<String, String> configs();
    }

}


2) New broker configuration:

...

The current behaviour stays the same. This is just an additional feature administrators can opt-in.

Rejected Alternatives

...

  • Computing assignments for the whole batch: Instead of computing assignment for each topic in the CreateTopics/CreatePartitions request one at a time, we looked at computing assignment for all of them in a single call. We rejected this approach for the following reasons:
    • All logic (validation, policies, creation in ZK) in AdminManager works on a single topic at a time. Grouping the replica assignment computation created very complicated logic
    • It's not clear if having all topics at once would significantly improve computed assignments. This is especially true for the 4 scenarios listed in the Motivation section