You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Draft

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When creating topics or partitions, the Kafka controller has to pick brokers to host the new partitions. The current assignment logic is based on a round robin algorithm and supports rack awareness. While this works relatively well in many scenarios as it's not aware of the state of the clusters, in a few cases the assignments it generates is not optimal. Many cluster administrators then rely on tools like Cruise Control to move partitions to better brokers. This process is expensive as often data has to be copied between brokers.

It would be great to allow custom logic for the assignor to better understand the state of the cluster and minimize the number of partition reassignments necessary. It would enable administrators to build assignment goals (similar to Cruise Control goals) for their clusters.

Some scenarios that could benefit greatly from this feature:

  • When adding brokers to a cluster, Kafka currently does not necessarily place new partitions on new brokers
  • When removing brokers from a cluster, as Kafka currently will keep placing partitions on all existing brokers
  • When some brokers are near their storage/throughput limit, the assignor could avoid putting new partitions on them
  • When we want to place partitions differently for different users

Public Interfaces

1) New public interface:

ReplicaAssignor
package org.apache.kafka.server;

public interface ReplicaAssignor {

    /**
     * Assigns the specified partitions to brokers.
     * If an assignment can't be computed, for example if the state of the cluster does not statify a requirement,
     * implementations can throw a ReplicaAssignorException to prevent the topic/partition creation.
     * @param topicName The name of the topic
     * @param partitions The list of partitionIds that need an assignment
     * @param replicationFactor The replication factor of the topic
     * @param configs The topic configurations
     * @param cluster The cluster metadata
     * @param principal The principal of the user initiating the request
     * @return A map of partitionId to list of assigned brokers
     */
    public Map<Integer, List<Integer>> assignReplicasToBrokers(
            String topicName,
            List<Integer> partitions,
            int replicationFactor,
            Map<String, String> configs,
            Cluster cluster,
            KafkaPrincipal principal) throws ReplicaAssignorException;

}


2) New broker configuration:

Name: replica.assignor.class.name

Type: class

Doc: The fully qualified class name that implements ReplicaAssignor. This is used by the broker to determine replicas when topics or partitions are created. This defaults to DefaultReplicaAssignor.


3) New exception and error code

A new exception, org.apache.kafka.common.errors.ReplicaAssignorException, will be defined. It will be non retriable.

When ReplicaAssignor implementations throw this exception, it will be mapped to a new error code:

REPLICA_ASSIGNOR_FAILED: The replica assignor could not compute an assignment for the topic or partition.

Proposed Changes

The existing assignment logic will be extracted into a class, DefaultReplicaAssignment, that implements the ReplicaAssignor interface. It will stay the default implementation and a private class. Instead of throwing AdminOperationException, it will be updated to throw ReplicaAssignorException so users get a better error.

AdminManager will create an instance of the specified ReplicaAssignor implementation or if none were set of DefaultReplicaAssignor. When creating topics or partitions, for each topic, it will call assignReplicasToBrokers(). If multiple topics are present in the request, AdminManager will update the Cluster object so the ReplicaAssignor class has access to the up to date cluster metadata.

Compatibility, Deprecation, and Migration Plan

The current behaviour stays the same. This is just an additional feature administrators can opt-in.

Rejected Alternatives

None


  • No labels