Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In AWS, racks are usually mapped to the concept of availability zones

Public Interfaces

One An optional parameter rackInfo broker property will be passed to AdminUtils.assignReplicasToBrokers:added 

Code Block
languagescala
/**
 * @param rackInfo Map from broker ID to its rack (zone). If empty, no rack aware
 *                 assignment will be used.
 */
def assignReplicasToBrokers(brokerList: Seq[Int],
                            nPartitions: Int,
                            replicationFactor: Int,
                            fixedStartIndex: Int = -1,
                            startPartitionId: Int = -1,
                            rackInfo: Map[Int, String] = Map()) 

 

The RackLocator interface will be used to return rack information for brokers at runtime and for command line tools. It is defined as:

 

case class Broker(id: Int, endPoints: Map[SecurityProtocol, EndPoint], rack: Option[String] = None)

rack will be an optional field of version 2 JSON schema for a broker. e.g.:

Code Block
{"version":2,
  "host","localhost",
  "port",9092
  "jmx_port":9999,
  "timestamp":"2233345666",
  "endpoints": ["PLAINTEXT://host1:9092",
 
Code Block
languagescala
/**
 * Interface that defines API to get broker to rack mapping. This is used by
 * rack aware replica assignment. The implementation class can be passed as
 * rack.locator.class property at server start up or -rack-locator-class command\
 * line argument for TopicCommand.
 *
 * @param zkClient ZkClient for the cluster that can be used to retrieve information
 *                 from ZooKeeper
 * @param props Properties used by implementation.
 */
abstract class RackLocator(val zkClient: ZkClient, val props: Properties) {
  def getRackInfo(): Map[Int, String]
}

zkClient is passed into the interface in case the implementation stores the rack information under the same ZooKeeper path as the Kafka cluster or needs to access cluster meta data information from the ZooKeeper. Implementation can use the additional properties passed into the interface to initialize and access the storage where the rack information is stored. The single method getRackInfo() returns a map that maps a broker ID to the rack ID represented as a string.

The following optional properties are added to broker configuration:

  • rack.locator.class: The implementation class name
  • rack.locator.properties: Comma delimited key=value pairs as properties to pass into the RackLocator, e.g, "username=foo,password=bar"

The following optional command line arguments are added to TopicCommand and ReassignPartitionsCommand which are equivalent to the above properties used by broker at runtime:

...

"SSL://host1:9093"],
  "rack": "dc1"
}

If no rack information is specified, the field will not be included in JSON.

Rack can be specified in the broker property file as

Code Block
rack=<rack ID as string>

For example, 

rack=dc1

rack=us-east-1c

The rack property will be part of meta data response. If there is no rack, the rack property will not be written to the wire.

...

Proposed Changes

  • AdminUtils.assignReplicasToBrokers is  will be updated to create rack aware assignment based on the broker to rack mapping passed into it. If the map is empty, it will treat it as no rack information available and broker-rack mapping from ZooKeeper data before doing replica assignment. If none of the brokers has rack, it will produce the same assignment as the current implementation without using rack. If some brokers have rack, and some do not have, the API will throw AdminOperationException. This is to prevent none rack aware assignment being created during version upgrade and human error where the rack information is missing. When making the rack aware assignment, it tries to keep the following properties:
    • Even distribution of replicas among brokers
    • Even distribution of partition leadership among brokers
    • Assign to as many racks as possible. That means if the number of racks are more than or equal to the number of replicas, each rack will have at most one replica. On the other hand, if the number of racks is less than the the number of replicas (which should happen very infrequently), each rack should have at least one replica and no other guarantees are made on how the replicas will be distributed among racks. For example, if there are 2 racks and 4 replicas, one rack can have 3 replicas, 2 replicas or 1 replica. This is to keep the algorithm simple while still keeping other replica distribution properties and fault tolerance from the racks.
  • Implementation detail of the rack aware assignment (see more in the pull request https://github.com/apache/kafka/pull/132):
    • Before doing the rack aware assignment, sort the broker list such that they are interlaced according to the rack. In other words, adjacent brokers in the sorted list should not be in the same rack if possible . For example, assuming 6 brokers mapping to 3 racks: 0 -> "rack1", 1 -> "rack1", 2 -> "rack2", 3 -> "rack2", 4 -> "rack3", 5 -> "rack3", the sorted broker list could be (0, 2, 4, 1, 3, 5)
    • Apply the same assignment algorithm to assign replicas, with the addition of skipping a broker if its rack is already used for the same partition
  • KafkaApis will initialize the RackLocator if configured and call the new AdminUtils.assignReplicasToBrokers API with the broker to rack mapping obtained from RackLocator. RackLocator.getRackInfo() will be called every time a topic needs to be created (when auto topic creation is enabled) to ensure the latest broker-rack mapping is used.
  • TopicCommand and ReassignPartitionsCommand will initialize the RackLocator (if enabled from command line) and call the new AdminUtils.assignReplicasToBrokers API with the broker to rack mapping obtained from RackLocator. Rack aware assignment will be used for topic creation, adding partitions and partitions reassignment.

Compatibility, Deprecation, and Migration Plan

  • The changes are 100% API compatible with current version
  • All new properties and added arguments to APIs are optional 
  • To enable the feature, users can store the broker to rack mapping information in their own ways, implement the RackLocator interface and pass in the additional configuration for broker or command line tools.Rack property will be included in version 2 broker JSON schema and version 1 of UpdateMetaDataRequest.