You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

 

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: Unable to render Jira issues macro, execution error.

Github Pull Request: https://github.com/apache/kafka/pull/132

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Machines in data center are sometimes grouped in racks. Racks provide isolation as each rack may be in a different physical location and has its own power source. When resources are properly replicated across racks, it provides fault tolerance in that if a rack goes down,  the remaining racks can continue to serve traffic.

In Kafka, if there are more than one replica for a partition, it would be nice to have replicas placed in as many different racks as possible so that the partition can continue to function if a rack goes down. In addition, it makes maintenance of  Kafka cluster easier as you can take down the whole rack at a time.

In AWS, racks are usually mapped to the concept of availability zones

Public Interfaces

One optional parameter rackInfo will be passed to AdminUtils.assignReplicasToBrokers:

/**
 * @param rackInfo Map from broker ID to its rack (zone). If empty, no rack aware
 *                 assignment will be used.
 */
def assignReplicasToBrokers(brokerList: Seq[Int],
                            nPartitions: Int,
                            replicationFactor: Int,
                            fixedStartIndex: Int = -1,
                            startPartitionId: Int = -1,
                            rackInfo: Map[Int, String] = Map()) 

 

The RackLocator interface will be used to return rack information for brokers at runtime and for command line tools. It is defined as:

 

/**
 * Interface that defines API to get broker to rack mapping. This is used by
 * rack aware replica assignment. The implementation class can be passed as
 * rack.locator.class property at server start up or -rack-locator-class command\
 * line argument for TopicCommand.
 *
 * @param zkClient ZkClient for the cluster that can be used to retrieve information
 *                 from ZooKeeper
 * @param props Properties used by implementation.
 */
abstract class RackLocator(val zkClient: ZkClient, val props: Properties) {
  def getRackInfo(): Map[Int, String]
}

zkClient is passed into the interface in case the implementation stores the rack information under the same ZooKeeper path as the Kafka cluster or needs to access cluster meta data information from the ZooKeeper. Implementation can use the additional properties passed into the interface to initialize and access the storage where the rack information is stored. The single method getRackInfo() returns a map that maps a broker ID to the rack ID represented as a string.

The following optional properties are added to broker configuration:

  • rack.locator.class: The implementation class name
  • rack.locator.properties: Comma delimited key=value pairs as properties to pass into the RackLocator, e.g, "username=foo,password=bar"

The following optional command line arguments are added to TopicCommand and ReassignPartitionsCommand which are equivalent to the above properties used by broker at runtime:

  • rack-locator-classs 
  • rack-locator-properties

Proposed Changes

  • AdminUtils.assignReplicasToBrokers is updated to create rack aware assignment based on the broker to rack mapping passed into it. If the map is empty, it will treat it as no rack information available and produce the same assignment as the current implementation. When making the rack aware assignment, it tries to keep the following properties:
    • Even distribution of replicas among brokers
    • Even distribution of partition leadership among brokers
    • Assign to as many racks as possible. That means if the number of racks are more than or equal to the number of replicas, each rack will have at most one replica. On the other hand, if the number of racks is less than the the number of replicas (which should happen very infrequently), each rack should have at least one replica and no other guarantees are made on how the replicas will be distributed among racks. For example, if there are 2 racks and 4 replicas, one rack can have 3 replicas, 2 replicas or 1 replica. This is to keep the algorithm simple while still keeping other replica distribution properties and fault tolerance from the racks.
  • Implementation detail of the rack aware assignment (see more in the pull request https://github.com/apache/kafka/pull/132):
    • Before doing the rack aware assignment, sort the broker list such that they are interlaced according to the rack. In other words, adjacent brokers in the sorted list should not be in the same rack if possible . For example, assuming 6 brokers mapping to 3 racks: 0 -> "rack1", 1 -> "rack1", 2 -> "rack2", 3 -> "rack2", 4 -> "rack3", 5 -> "rack3", the sorted broker list could be (0, 2, 4, 1, 3, 5)
    • Apply the same assignment algorithm to assign replicas, with the addition of skipping a broker if its rack is already used for the same partition
  • KafkaApis will initialize the RackLocator if configured and call the new AdminUtils.assignReplicasToBrokers API with the broker to rack mapping obtained from RackLocator. RackLocator.getRackInfo() will be called every time a topic needs to be created (when auto topic creation is enabled) to ensure the latest broker-rack mapping is used.
  • TopicCommand and ReassignPartitionsCommand will initialize the RackLocator (if enabled from command line) and call the new AdminUtils.assignReplicasToBrokers API with the broker to rack mapping obtained from RackLocator. Rack aware assignment will be used for topic creation, adding partitions and partitions reassignment.

Compatibility, Deprecation, and Migration Plan

  • The changes are 100% API compatible with current version
  • All new properties and added arguments to APIs are optional 
  • To enable the feature, users can store the broker to rack mapping information in their own ways, implement the RackLocator interface and pass in the additional configuration for broker or command line tools.
  • No labels