Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Draft

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When creating topics or partitions, the Kafka controller has to pick brokers to host the new partitions. The current assignment placement logic is based on a round robin algorithm and supports rack awareness. While this works relatively well in many scenarios, in a few cases the assignments placements it generates is are not optimal because it's not aware of the state of the clusters. Many cluster administrators then rely on tools like Cruise Control to move partitions to better brokers. This process is expensive as often data has to be copied between brokers.

It would be desirable to allow custom logic for the assignor placer to leverage the state of the cluster and minimize the number of partition reassignments necessary. It would enable administrators to build assignment goals (similar to Cruise Control goals) for their clusters.

...

  • When adding brokers to a cluster, Kafka currently does not necessarily place new partitions on new brokers
  • When removing brokers from a cluster, as Kafka currently will keep placing partitions on all existing brokers
  • When some brokers are near their storage/throughput limit, the assignor could avoid putting new partitions on them
  • When we want to place partitions differently for different users

Public Interfaces

1) New public interface: ReplicaPlacer

Code Block
languagejava
titleReplicaAssignor
package org.apache.kafka.server.placer;

/**
 * The interface which a Kafka replica placement policy must implement.
 */
@InterfaceStability.Evolving
public 
interface ReplicaAssignorReplicaPlacer extends Configurable, Closeable {

    /**
     * ComputesCreate replicaa assignmentsnew for the specified partitionsreplica placement.
     * 
     * If@param anstartPartition assignment can't be computed, for example if theThe statepartition ofID theto cluster does not satisfy a requirement,start with.
     * implementations@param cannumPartitions throw ReplicaAssignorException to prevent the topic/partition creation.
  The number  *of @param partitions Theto partitionscreate beingplacements createdfor.
     * @param principal ThenumReplicas principal of the user initiating the request
     * @return The computednumber replicaof assignments
replicas to create for  * @throw ReplicaAssignorExceptioneach partition.
     */
 @param iterator  public ReplicaAssignment computeAssignment(
          An iterator NewPartitionsthat partitions,
yields all the usable brokers.
     *
   KafkaPrincipal principal) throws ReplicaAssignorException;

* @return    /**
     * Metadata update callback that is invoked whenever UpdateMetadata request is received from
 A list of replica * the controllerlists.
     *
     * @param@throws clusterInvalidReplicationFactorException Cluster metadata including brokers,If partitionstoo andmany theirreplicas leaders if knownwere requested.
     */
 @throws ReplicaPlacementException  public void updateClusterMetadata(Cluster cluster);

    /**
   If a *new Computed replica assignmentsplacement forcan't thebe specified partitionscreated
     */
    publicList<List<Integer>> class ReplicaAssignment {

place(int startPartition,
        private final Map<Integer, List<Integer>> assignment;

        public ReplicaAssignment(Map<Integer, List<Integer>> assignment) {
      int numPartitions,
     this.assignment = assignment;
        }

        /**
       short numReplicas,
 * @return a Map with the list of replicas for each partition
         */
        Map<Integer, List<Integer>>Iterator<UsableBroker> assignment(iterator) {
        throws    return assignment;
        }
    }

    InvalidReplicationFactorException, ReplicaPlacementException;
}

2) New public class: UsableBroker

Code Block
languagejava
/**
     * PartitionsA whichbroker requirewhere ana assignmentreplica tocan be computedplaced.
     */
    public interfaceclass NewPartitionsUsableBroker {

    public UsableBroker(int id,  /**
   Optional<String> rack, boolean fenced) {
      * The namethis.id of= theid;
 topic for these partitions
    this.rack = rack;
   */
     this.fenced   String topicName()= fenced;

        /**
         * The list of partition ids}

    public int    */id() {
        List<Integer>return partitionIds()id;

        /**
         * The replication factor of the topic}

    public Optional<String>    */rack() {
        shortreturn replicationFactor()rack;

        /**
         * The configuration of the topic}

    public boolean    */fenced() {
        Map<String, String> configs()return fenced;
    }

}


23) New broker configuration:

Name: replica.assignorplacer.class.name

Type: class

Doc: The fully qualified class name that implements ReplicaAssignorReplicaPlacer. This is used by the broker to determine replicas when topics or partitions are created. This defaults to DefaultReplicaAssignorStripedReplicaPlacer.


34) New exception and error code

A new exception, org.apache.kafka.common.errors.ReplicaAssignorExceptionReplicaPlacementException, will be defined. It will be non retriable.

When ReplicaAssignor ReplicaPlacer implementations throw this exception, it will be mapped to a new error code:

REPLICA_ASSIGNORPLACEMENT_FAILED: The replica assignor placer could not compute an assignment a placement for the topic or partition.

Proposed Changes

The existing assignment logic will be extracted into a class, DefaultReplicaAssignor, that implements the ReplicaAssignor interface. It will stay as the default implementation and a private class. Instead of throwing AdminOperationException, it will be updated to throw ReplicaAssignorException so users get a better error.AdminManager will create an instance of the specified ReplicaAssignor implementation or if none were set of DefaultReplicaAssignor. When creating topics or partitions, for each topic, it will call assignReplicasToBrokers(). If multiple topics are present in the request, AdminManager will update the Cluster object so the ReplicaAssignor class has access to the up to date cluster metadataproposal is to expose the ReplicaPlacer API which is currently internal as public API. It will move from the org.apache.kafka.controller package in the metadata project to the org.apache.kafka.server.placer package in the clients project. Similarly the existing UsableBroker class will move from org.apache.kafka.metadata package in the metadata project to the org.apache.kafka.server.placer  package in the clients project.

This feature will only be available in KRaft mode.

Compatibility, Deprecation, and Migration Plan

The new exception/error code changes some of responses clients can get if DefaultReplicaAssignor encounters an error. For example, if a cluster contains brokers with and without "broker.rack", the error will be REPLICA_ASSIGNOR_FAILED instead of UNKNOWN_SERVER_ERROR.

Rejected Alternatives

  • Computing assignments for the whole batch: Instead of computing assignment for each topic in the CreateTopics/CreatePartitions request one at a time, we looked at computing assignment for all of them in a single call. We rejected this approach for the following reasons:
    • All logic (validation, policies, creation in ZK) in AdminManager works on a single topic at a time. Grouping the replica assignment computation created very complicated logic
    • It's not clear if having all topics at once would significantly improve computed assignments. This is especially true for the 4 scenarios listed in the Motivation section

...