Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Data is natively replicated into all data centers by Kafka topic replication.
  • No data is lost when 1 DC is lost and no configuration change is required - design is implicitly relying on native Kafka replication.
  • From an operational point of view, it is much easier to configure and operate such a topology than a replication scenario via MM2.
  • Currently Kafka has a single partition assignment strategy and if users want to override that, they can only do it via manually assigning brokers to replicas on topic creation. This could be done via 

Multi-level rack awareness

...

We will introduce a new broker config called broker.replica.assignorplacer that makes would make it possible for users to specify their custom implementation of replica assignorplacers. Its default value will be the current rack aware replica assignment strategy, we will just make that compatible with this new interface that we'll detail below. We will also allow any plugins to fall back to the original algorithm if needed or return an error if they can't come up with a suitable solution.

Interfaces

A new interface would be created in a new Equivalents of these proposed interfaces currently exist in the metadata component under org.apache.kafka.replica package that’d allow new implementations to be added. We’d provide 2 implementations, one would be the current default and the other would be the .placement. We'd like to improve these interfaces slightly with some naming improvements and added/changed fields to better fit a general algorithm. We would also like to provide a new, multi-level rack aware implementation discussed in this KIP. The current algorithm would stay as it is, just slightly refactored to fit well under the new interface.

/** * This class defines a view of a cluster that the partition assignor can see, such as the current assignment * or broker -> rack mappings. */ public class ClusterView { private final Map<TopicPartition, List<Integer>> partitionAssignments; private final List<BrokerInfo> brokerInfos; public ClusterView(Map<TopicPartition, List<Integer>> partitionAssignments, List<BrokerInfo> brokerInfos) { this.partitionAssignments = partitionAssignments; this.brokerInfos = brokerInfos; } public Map<TopicPartition, List<Integer>> topicAssignments() { return Collections.unmodifiableMap(partitionAssignments); } public List<BrokerInfo> brokerInfos() { return Collections.unmodifiableList(brokerInfos); } }
Code Block
languagejava
titleBrokerReplicaAssignor
linenumberstrue
/**
 * This interface provides an API for implementing replica assignment strategies. Both rack aware and unaware
 * strategies are supported by this interface. If needed it can defer assignment to Kafka's default rack unaware
 * replicas assignment strategy.
 */
public interface BrokerReplicaAssignorReplicaPlacer {

    /**
     * Creates an assignment of a topic given the racks, number of partitions and the replication factor.
     * @param clusterView is the necessary information about the cluster (such as current assignment, rack information)
     *                    that is needed to be able to decide replica assignments.
     * @param numPartitionsplacement is the numberassignment ofparameters, partitionssuch ofas the assigned topic replication factor and number of partitions.
     * @param replicationFactorbrokerIds is thean replicationoptional factorlist of the assigned topic.
     * @param brokerIds is an optional list of brokerIDs brokerIDs that is used to generate an assignment. If null is specified,
     *                  then all brokers are used.
     * @return
     * <ul>
     *     <li><code>null</code> to defer the assignment to Kafka, that uses its rack unaware assignment strategy.</li>
     *     <li>the assignment in the form of (partition) -> (arrayzero based list of brokerlists IDs)where mapping where the sizeouter list ofspecifies the map
     *     equals to the number ofpartition partitionsindex and the sizeinner oflist values inspecifies the mapassigned equals to the replication factorbroker ids.</li>
     * </ul>
     */
 @throws org.apache.kafka.common.errors.InvalidReplicaAssignmentException if theList<List<Integer>> algorithm can't assign replicas
     * based on the specified parameters and can't defer it to Kafka's default rack unaware assignment strategy.
     * @throws org.apache.kafka.common.errors.InvalidReplicationFactorException if the number of replicas is greater
     * than what the assignment strategy accepts as a maximum value.
     */
    Map<Integer, Integer[]> assignReplicasToTopic(ClusterView clusterView, int numPartitions, short replicationFactor,
                                                  List<Integer> brokerIds);
}  
Code Block
languagejava
titleClusterView
linenumberstrue
place(PlacementSpec placement, ClusterView clusterView, List<Integer> brokerIds);
}  

The ClusterView class would be the improved version of ClusterDescriber. We would like to add the current list of partition assignments.

Code Block
languagejava
titleBrokerInfoClusterView
linenumberstrue
/**
 * Contains any information about the broker that replica assignment strategies may need to consider This class defines a view of a cluster that the partition placer can see, such as the current assignment
 * or broker -> rack mappings.
 */
public class BrokerInfoClusterView {

    private final finalMap<TopicPartition, intList<Integer>> brokerIdpartitionAssignments;
    private final StringIterator<UsableBroker> rackIdusableBrokers;

    public BrokerInfo(int brokerIdClusterView(Map<TopicPartition, List<Integer>> partitionAssignments, StringList<BrokerInfo> rackIdbrokerInfos) {
        this.brokerIdpartitionAssignments = brokerIdpartitionAssignments;
        this.rackIdbrokerInfos = rackIdbrokerInfos;
    }

    public Map<TopicPartition, intList<Integer>> getBrokerIdtopicAssignments() {
        return brokerIdCollections.unmodifiableMap(partitionAssignments);
    }

    public Optional<String>List<BrokerInfo> getRackIdbrokerInfos() {
        return OptionalCollections.ofunmodifiableList(rackIdbrokerInfos);
    }
}

Admin API

We would like to add a new Admin API method that enables commands like the reassignment command to generate an assignment that is suggested by the broker based on what’s configured there. This would eliminate the need to pass down the corresponding assignor configuration to the client which may or may not be aware of what’s configured on the brokers and also allows the active controller to be the single source to assign replicas.

Code Block
languagejava
linenumberstrue
    /**
     * Create a partitions assignment according to the partition assignment strategy set in the cluster. It will use the
     * brokers and racks available to the cluster to create an assignment given the number of partitions and the
     * replication factor.
     * @param numPartitions is the number of partitions of the topic to act on
     * @param replicationFactor is the replication factor of the topic in action
     * @return a result object containing the partition -> brokerId[] mappingassignment.
     */
    CreateReplicaAssignmentResultCreateReplicaPlacementResult createReplicaAssignmentcreateReplicaPlacement(int numPartitions, short replicationFactor);

    /**
     * Create a partitions assignment according to the partition assignmentplacement strategy set in the cluster. It will use the
     * brokers and racks available to the cluster to create an assignment given the number of partitions and the
     * replication factor. In an extra options parameter we can define a starting partition and replication factor.
     * @param numPartitions is the number of partitions of the topic to act on
     * @param replicationFactor is the replication factor of the topic in action
     * @param options is the parameter where extra options likecan startingbe partitionset numberto orfurther replicationcontrol factor can be setreplica placement.
     * @return a result object containing the partition -> brokerId[] mapping.
     */
    CreateReplicaAssignmentResultCreateReplicaPlacementResult createReplicaAssignmentcreateReplicaPlacement(int numPartitions, short replicationFactor,
                                                          CreateReplicaAssignmentOptionsCreateReplicaPlacementOptions options);


Code Block
languagejava
titleCreateReplicaAssignmentResultCreateReplicaPlacementResult
linenumberstrue
public class CreateReplicaAssignmentResultCreateReplicaPlacementResult {

    private KafkaFuture<Map<Integer, Integer[]>> KafkaFuture<List<List<Integer>>> future;

    CreateReplicaAssignmentResultCreateReplicaPlacementResult(KafkaFuture<Map<Integer, Integer[]>> KafkaFuture<List<List<Integer>>> future) {
        this.future = future;
    }

    public KafkaFuture<Map<Integer, Integer[]>> assignmentsKafkaFuture<List<List<Integer>>> placements() {
        return future;
    }
}


Code Block
languagejava
titleCreateReplicaAssignmentOptionsCreateReplicaPlacementOptions
linenumberstrue
public class CreateReplicaAssignmentOptionsCreateReplicaPlacementOptions extends AbstractOptions<CreateReplicaAssignmentOptions>AbstractOptions<CreateReplicaPlacementOptions> {

    private boolean forceSimpleAssignmentforceSimplePlacement;
    private List<Integer> brokerIds;

    public boolean forceSimpleAssignmentforceSimplePlacement() {
        return forceSimpleAssignmentforceSimplePlacement;
    }

    public CreateReplicaAssignmentOptionsCreateReplicaPlacementOptions forceSimpleAssignmentforceSimplePlacement(boolean forceSimpleAssignmentforceSimplePlacement) {
        this.forceSimpleAssignmentforceSimplePlacement = forceSimpleAssignmentforceSimplePlacement;
        return this;
    }

    public List<Integer> brokerIds() {
        return Collections.unmodifiableList(brokerIds);
    }

    public CreateReplicaAssignmentOptionsCreateReplicaPlacementOptions brokerIds(List<Integer> brokerIds) {
        this.brokerIds = brokerIds;
        return this;
    }
}

By specifying the forceSimpleAssignmentforceSimplePlacement flag we instruct the brokers to use the default rack unaware algorithm of Kafka. This is needed to maintain compatibility with the –disable-rack-awareness flag in kafka-partition-reassignment.sh. This option will effectively be triggered by specifying that flag and will behave the same way, it will instruct the broker to skip the specified partition assignor and use the default rack unaware algorithm. Note that a rack aware assignor might support a scenario where just part of the brokers have assigned racks.

Some algorithms might work with only a specific subset of brokers as an input to the assignment. In the Admin API we could specify this with the brokerIds list.

Protocol

For the Admin API described above, we’ll need a new protocol as well. This is defined as follows.

Code Block
titleCreatePartitionAssignmentRequestCreatePartitionPlacementRequest
linenumberstrue
{
  "apiKey": 68,
  "type": "request",
  "listeners": ["zkBroker", "broker", "controller"],
  "name": "CreateReplicaAssignmentRequestCreateReplicaPlacementRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "NumPartitions", "type": "int32", "versions": "0+",
      "about": "The number of partitions of the topic in the assignment." },
    { "name": "ReplicationFactor", "type": "int16", "versions": "0+",
      "about": "The number of replicas each of the topic's partitions in the assignment." },
    { "name": "ForceSimpleAssignmentForceSimplePlacement", "type": "bool", "versions": "0+", "default": "false",
      "about": "Forces the default rack unaware partition assignment algorithm of Kafka." },
    { "name": "NodeId", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
      "about": "The broker ID." },
    { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
      "about": "How long to wait in milliseconds before timing out the request." }
  ]
}


Code Block
titleCreatePartitionAssignmentResponseCreatePartitionPlacementResponse
linenumberstrue
{
  "apiKey": 68,
  "type": "response",
  "name": "CreateReplicaAssignmentResponseCreateReplicaPlacementResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ReplicaAssignmentReplicaPlacement", "type": "[]ReplicaAssignmentReplicaPlacement", "versions": "0+",
      "about": "A partition to list of broker ID mapping.",
      "fields": [
        { "name": "PartitionId", "type": "int32", "versions": "0+", "mapKey": true },
        { "name": "Replicas", "type": "[]int32", "versions": "0+" }
      ]
    }
  ]
}

...