Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: add brokerIds

...

Code Block
languagejava
titleBrokerReplicaAssignor
linenumberstrue
/**
 * This interface provides an API for implementing replica assignment strategies. Both rack aware and unaware
 * strategies are supported by this interface. If needed it can defer assignment to Kafka's default rack unaware
 * replicas assignment strategy.
 */
public interface BrokerReplicaAssignor {

    /**
     * Creates an assignment of a topic given the racks, number of partitions and the replication factor.
     * @param clusterView is the necessary information about the cluster (such as current assignment, rack information)
     *                    that is needed to be able to decide replica assignments.
     * @param numPartitions is the number of partitions of the assigned topic.
     * @param replicationFactor is the replication factor of the assigned topic.
     * @return
@param brokerIds is an optional *list <ul>
of brokerIDs that is used *to generate an assignment. If <li><code>null</code>null tois deferspecified,
 the assignment to Kafka, that* uses its rack unaware assignment strategy.</li>
     *     <li>the assignment inthen theall formbrokers of (partition) -> (array of broker IDs) mapping where the size of the mapare used.
     * @return
     * <ul>
     *     equals<li><code>null</code> to the number of partitions anddefer the sizeassignment ofto valuesKafka, inthat theuses mapits equalsrack tounaware theassignment replication factorstrategy.</li>
     * </ul>
    <li>the *assignment @throws org.apache.kafka.common.errors.InvalidReplicaAssignmentException if the algorithm can't assign replicas
     * based on the specified parameters and can't defer it to Kafka's default rack unaware assignment strategy.
     * in the form of (partition) -> (array of broker IDs) mapping where the size of the map
     *     equals to the number of partitions and the size of values in the map equals to the replication factor.</li>
     * </ul>
     * @throws org.apache.kafka.common.errors.InvalidReplicationFactorExceptionInvalidReplicaAssignmentException if the algorithm numbercan't ofassign replicas is greater
     * thanbased whaton the assignmentspecified strategy accepts as a maximum valueparameters and can't defer it to Kafka's default rack unaware assignment strategy.
     */
    Map<Integer, Integer[]> assignReplicasToTopic(ClusterView clusterView, int numPartitions, short replicationFactor);
}

Code Block
languagejava
titleClusterView
linenumberstrue
/**
 * This class defines a view of a cluster that the partition assignor can see, such as the current assignment
 * or broker -> rack mappings.
 */
public class ClusterView {

    private final Map<TopicPartition, List<Integer>> partitionAssignments;
    private final List<BrokerInfo> brokerInfos;

    public ClusterView(Map<TopicPartition, List<Integer>> partitionAssignments, List<BrokerInfo> brokerInfos) {
 @throws org.apache.kafka.common.errors.InvalidReplicationFactorException if the number of replicas is greater
     * than what the assignment strategy accepts as a maximum value.
     */
    Map<Integer, Integer[]> assignReplicasToTopic(ClusterView clusterView, int numPartitions, short replicationFactor,
                                     this.partitionAssignments = partitionAssignments;
        this.brokerInfos = brokerInfos;
 List<Integer>   }

    public Map<TopicPartition, List<Integer>> topicAssignments() {
        return Collections.unmodifiableMap(partitionAssignments);
    }

    public List<BrokerInfo> brokerInfos() {
        return Collections.unmodifiableList(brokerInfos);
    }
}brokerIds);
}  


Code Block
languagejava
titleClusterView
linenumberstrue
/**
 * This class defines a view of a cluster that the partition assignor can see, such as the current assignment
 * or broker -> rack mappings.
 */
public class ClusterView {

    private final Map<TopicPartition, List<Integer>> partitionAssignments;
    private final List<BrokerInfo> brokerInfos;

    public ClusterView(Map<TopicPartition, List<Integer>> partitionAssignments, List<BrokerInfo> brokerInfos) {
        this.partitionAssignments = partitionAssignments;
        this.brokerInfos = brokerInfos;
    }
Code Block
languagejava
titleBrokerInfo
linenumberstrue
/**
 * Contains any information about the broker that replica assignment strategies may need to consider.
 */
public class BrokerInfo {

    private final int brokerId;
    private final String rackId;

    public BrokerInfo(int brokerIdMap<TopicPartition, StringList<Integer>> rackIdtopicAssignments() {
        this.brokerId = brokerId;
        this.rackId = rackIdreturn Collections.unmodifiableMap(partitionAssignments);
    }

    public intList<BrokerInfo> getBrokerIdbrokerInfos() {
        return brokerIdCollections.unmodifiableList(brokerInfos);
    }
}


Code Block
languagejava
titleBrokerInfo
linenumberstrue
/**
 * Contains any publicinformation Optional<String> getRackId() {
about the broker that replica assignment strategies may need to consider.
 */
public class BrokerInfo {

    private final int brokerId;
    return Optional.of(rackId)private final String rackId;

    }
}

Admin API

We would like to add a new Admin API method that enables commands like the reassignment command to generate an assignment that is suggested by the broker based on what’s configured there. This would eliminate the need to pass down the corresponding assignor configuration to the client which may or may not be aware of what’s configured on the brokers and also allows the active controller to be the single source to assign replicas.

public BrokerInfo(int brokerId, String rackId) {
        this.brokerId = brokerId;
        this.rackId = rackId;
    }

    public int getBrokerId() {
        return brokerId;
    }

    public Optional<String> getRackId() {
        return Optional.of(rackId);
    }
}

Admin API

We would like to add a new Admin API method that enables commands like the reassignment command to generate an assignment that is suggested by the broker based on what’s configured there. This would eliminate the need to pass down the corresponding assignor configuration to the client which may or may not be aware of what’s configured on the brokers and also allows the active controller to be the single source to assign replicas.


Code Block
languagejava
linenumberstrue
    /**
     * Create a partitions assignment according to the partition assignment strategy set in the cluster. It will use the
     * brokers and racks available to the cluster to create an assignment given the number of partitions and the
     * replication factor.
Code Block
languagejava
linenumberstrue
    /**
     * Create a partitions assignment according to the partition assignment strategy set in the cluster. It will use the
     * brokers and racks available to the cluster to create an assignment given the number of partitions and the
     * replication factor. @param numPartitions is the number of partitions of the topic to act on
     * @param replicationFactor is the replication factor of the topic in action
     * @return a result object containing the partition -> brokerId[] mapping.
     */
    CreateReplicaAssignmentResult createReplicaAssignment(int numPartitions, short replicationFactor);

    /**
     * @param numPartitions is the number of partitions of the topic to act on
     * @param replicationFactor is the replication factor of the topic in action
     * @return a result object containing the partition -> brokerId[] mapping.
     */
    CreateReplicaAssignmentResult createReplicaAssignment(int numPartitions, short replicationFactor);

    /**
     * Create a partitions assignment according to the partition assignment strategy set in the cluster. It will use the
     * brokers and racks available to the cluster to create an assignment given the number of partitions and theCreate a partitions assignment according to the partition assignment strategy set in the cluster. It will use the
     * brokers and racks available to the cluster to create an assignment given the number of partitions and the
     * replication factor. In an extra options parameter we can define a starting partition and replication factor.
     * @param numPartitions is the number of partitions of the topic to act on
     * @param replicationFactor is the replication factor of the topic in action
     * replication factor. In an extra options@param options is the parameter wewhere canextra defineoptions alike starting partition number andor replication factor can be set.
     * @param numPartitions is@return a result object containing the numberpartition of partitions of the topic to act on-> brokerId[] mapping.
     */
     * @param replicationFactor is the replication factor of the topic in action
CreateReplicaAssignmentResult createReplicaAssignment(int numPartitions, short replicationFactor,
                 * @param options is the parameter where extra options like starting partition number or replication factor can be set.
     * @return a result object containing the partition -> brokerId[] mapping.
     */
   CreateReplicaAssignmentOptions CreateReplicaAssignmentResult createReplicaAssignment(int numPartitions, short replicationFactor,options);


Code Block
languagejava
titleCreateReplicaAssignmentResult
linenumberstrue
public class CreateReplicaAssignmentResult {

    private KafkaFuture<Map<Integer, Integer[]>> future;

    CreateReplicaAssignmentResult(KafkaFuture<Map<Integer, Integer[]>> future) {
        this.future = future;
    }

    public KafkaFuture<Map<Integer, Integer[]>>            assignments() {
        return future;
   CreateReplicaAssignmentOptions options); }
}


Code Block
languagejava
titleCreateReplicaAssignmentResultCreateReplicaAssignmentOptions
linenumberstrue
public class CreateReplicaAssignmentOptions extends CreateReplicaAssignmentResultAbstractOptions<CreateReplicaAssignmentOptions> {

    private KafkaFuture<Map<Integer, Integer[]>> futureboolean forceSimpleAssignment;
    private List<Integer> brokerIds;

    CreateReplicaAssignmentResult(KafkaFuture<Map<Integer, Integer[]>> futurepublic boolean forceSimpleAssignment() {
        this.future = futurereturn forceSimpleAssignment;
    }

    public KafkaFuture<Map<Integer, Integer[]>> assignments(CreateReplicaAssignmentOptions forceSimpleAssignment(boolean forceSimpleAssignment) {
        return futurethis.forceSimpleAssignment = forceSimpleAssignment;
    }
}
Code Block
languagejava
titleCreateReplicaAssignmentOptions
linenumberstrue
public class CreateReplicaAssignmentOptions extends AbstractOptions<CreateReplicaAssignmentOptions>return {this;

    private boolean forceSimpleAssignment;}

    public booleanList<Integer> forceSimpleAssignmentbrokerIds() {
        return forceSimpleAssignmentCollections.unmodifiableList(brokerIds);
    }

    public CreateReplicaAssignmentOptions forceSimpleAssignmentbrokerIds(booleanList<Integer> forceSimpleAssignmentbrokerIds) {
        this.forceSimpleAssignmentbrokerIds = forceSimpleAssignmentbrokerIds;
        return this;
    }
}

By specifying the forceSimpleAssignment flag we instruct the brokers to use the default rack unaware algorithm of Kafka. This is needed to maintain compatibility with the –disable-rack-awareness flag in kafka-partition-reassignment.sh. This option will effectively be triggered by specifying that flag and will behave the same way, it will instruct the broker to skip the specified partition assignor and use the default rack unaware algorithm. Note that a rack aware assignor might support a scenario where just part of the brokers have assigned racks.

...