Table of Contents |
---|
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: DraftDiscarded
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
When creating topics or partitions, the Kafka controller has to pick brokers to host the new partitions. The current assignment placement logic is based on a round robin algorithm and supports rack awareness. While this works relatively well in many scenarios as , in a few cases the placements it generates are not optimal because it's not aware of the state of the clusters, in a few cases the assignments it generates is not optimal. Many cluster administrators then rely on tools like Cruise Control to move partitions to better brokers. This process is expensive as often data has to be copied between brokers.
It would be great desirable to allow custom logic for the assignor to better understand the state of the cluster placer to enable administrators to build assignment rules for their clusters and minimize the number of partition reassignments necessary. It would enable administrators to build assignment goals (similar to Cruise Control goals) for their clusters.
Some scenarios that could benefit greatly from this feature:
- When adding brokers to a cluster, Kafka currently does not necessarily place new partitions on new brokers
- When removing administrators want to remove brokers from a cluster, as Kafka currently will keep there is no way to prevent Kafka from placing partitions on all existing brokersthem
- When some brokers are near their storage/throughput limit, the assignor Kafka could avoid putting new partitions on them
- When we want to place partitions differently for different users
Public Interfaces
1) New public interface: ReplicaPlacer
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.server.placer; /** * The interface which a Kafka replica placement policy must implement. */ @InterfaceStability.Evolving public interface ReplicaAssignor interface ReplicaPlacer extends Configurable, Closeable { /** * AssignsCreate thea specifiednew partitionsreplica to brokersplacement. * If an assignment can't* be@param computed,startPartition for example if the state of the clusterThe doespartition notID statifyto astart requirement,with. * implementations can throw a ReplicaAssignorException @param numPartitions The number of partitions to preventcreate the topic/partition creationplacements for. * @param topicName numReplicas The namenumber of the topicreplicas to create for each partition. * @param partitions The list of partitionIds that need an assignment iterator An iterator that yields all the usable brokers. * * @param replicationFactor The replication factor of the topic @return A list of replica lists. * * @param configs The topic configurations@throws ReplicaPlacementException If a new replica placement can't be created * @param cluster The cluster metadata / List<List<Integer>> place(int startPartition, int numPartitions, * @param principal The principal of the user initiating theshort request numReplicas, * @return A map of partitionId to list of assigned brokers */ public Map<Integer, List<Integer>> assignReplicasToBrokers( Iterator<UsableBroker> iterator) throws ReplicaPlacementException; } |
2) New public class: UsableBroker
Code Block | ||
---|---|---|
| ||
/** * A broker where a replica can be placed. */ public class UsableBroker { public UsableBroker(int id, Optional<String> rack, boolean fenced) { this.id = id; this.rack String= topicName,rack; this.fenced = fenced; List<Integer> partitions,} public int id() { int replicationFactor, return id; } public Map<String, String> configs,Optional<String> rack() { return rack; Cluster cluster,} public boolean fenced() { KafkaPrincipal principal) throwsreturn ReplicaAssignorExceptionfenced; } } |
23) New broker configuration:
Name: replica.assignorplacer.class.name
Type: class
Doc: The fully qualified class name that implements ReplicaAssignorReplicaPlacer. This is used by the broker to determine replicas when topics or partitions are created. This defaults to DefaultReplicaAssignorStripedReplicaPlacer.
34) New exception and error code
A new exception, org.apache.kafka.common.errors.ReplicaAssignorExceptionReplicaPlacementException, will be defined. It will be non retriable.
When ReplicaAssignor ReplicaPlacer implementations throw this exception, it will be mapped to a new error code:
REPLICA_ASSIGNORPLACEMENT_FAILED: The replica assignor placer could not compute an assignment a placement for the topic or partition.
Proposed Changes
The existing assignment logic will be extracted into a class, DefaultReplicaAssignment, that implements the ReplicaAssignor proposal is to expose the ReplicaPlacer interface. It will stay the default implementation and a private class. Instead of throwing AdminOperationException, it will be updated to throw ReplicaAssignorException so users get a better error.AdminManager will create an instance of the specified ReplicaAssignor implementation or if none were set of DefaultReplicaAssignor. When creating topics or partitions, for each topic, it will call assignReplicasToBrokers(). If multiple topics are present in the request, AdminManager will update the Cluster object so the ReplicaAssignor class has access to the up to date cluster metadatamove from the org.apache.kafka.controller package in the metadata project to the org.apache.kafka.server.placer package in the clients project. Similarly the existing UsableBroker class will move from org.apache.kafka.metadata package in the metadata project to the org.apache.kafka.server.placer package in the clients project. Sanity checks about the replication factor that are currently performed in StripedReplicaPlacer will be performed in ControlClusterManager as they are common to all ReplicaPlacer implementations.
To address the use cases identified in the motivation section, some knowledge about the current state of the cluster is necessary. Details whether a new broker has just been added or is being decommissioned are not part of the cluster metadata. Therefore such knowledge has to be provided via an external mean to the ReplicaPlacer, for example via the configuration. Apart from configure(), this KIP does not provide a mechanism for defining the behavior of ReplicaPlacer implementation and cluster operators wanting to use this feature will have to build such a mechanism for their specific environments.
The logic assigning replicas to partition differs so much between ZooKeeper and KRaft that I propose making this feature only available in KRaft mode.
Compatibility, Deprecation, and Migration Plan
...
The current behaviour stays the same. This is just an additional feature administrators can opt-in.
Rejected Alternatives
...
Older clients that don't have the new error code will interpret it as UNKNOWN_SERVER_ERROR but they will receive the generated error message indicating the reason for the failure.
Rejected Alternatives
- Computing replica placement for the whole create topics/partitions request: Instead of computing assignment for each topic in the CreateTopics/CreatePartitions request one at a time, I looked at computing assignment for all of them in a single call. We rejected this approach for the following reasons:
- All current logic works on a single topic at a time. Grouping the replica assignment computation created very complicated logic
- It's not clear if having all topics at once would significantly improve computed assignments. This is especially true for the 4 scenarios listed in the Motivation section
- Providing more details about the cluster to the placer: Instead of only passing usable brokers, I considered passing a data structure with more details about the cluster, such as Cluster. While this could allow some additional advanced use cases, this would potentially not scale well if we expect Kafka to be able to support very large number of topics with KRaft.