Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleReplicaAssignor
package org.apache.kafka.server;

public interface ReplicaAssignor {

    /**
     * Assigns the specified partitions to brokers.
     * If an assignment can't be computed, for example if the state of the cluster does not statify a requirement,
     * implementations can throw ana exceptionReplicaAssignorException to prevent the topic/partition creation.
     * @param topicName The name of the topic
     * @param partitions The list of partitionIds that need an assignment
     * @param replicationFactor The replication factor of the topic
     * @param cluster The cluster metadata
     * @param principal The principal of the user initiating the request
     * @return A map of partitionId to list of assigned brokers
     */
    public Map<Integer, List<Integer>> assignReplicasToBrokers(
            String topicName,
            List<Integer> partitions,
            int replicationFactor,
            Cluster cluster,
            KafkaPrincipal principal) throws ReplicaAssignorException;

}


2) New broker configuration:

...

Doc: The fully qualified class name that implements ReplicaAssignor. This is used by the broker to determine replicas when topics or partiions are created. This defaults to DefaultReplicaAssignor.


3) New exception and error code

A new exception, org.apache.kafka.common.errors.ReplicaAssignorException, will be defined. It will be non retriable.

When ReplicaAssignor implementations throw this exception, it will be mapped to a new error code:

REPLICA_ASSIGNOR_FAILED: The replica assignor could not compute an assignment for the topic or partition.

Proposed Changes

The existing assignment logic will be extracted into a class, DefaultReplicaAssignment, that implements the ReplicaAssignor interface. It will stay the default implementation and a private class. Instead of throwing AdminOperationException, it will be updated to throw ReplicaAssignorException so users get a better error.

AdminManager will create an instance of the specified ReplicaAssignor implementation or if none were set of DefaultReplicaAssignor. When creating topics or partitions, for each topic, it will call assignReplicasToBrokers(). If multiple topics are present in the request, AdminManager will update the Cluster object so the ReplicaAssignor class has access to the up to date cluster metadata.

...