Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

broker.rack=dc1

broker.rack=us-east-1c

The rack property will be part of meta data response. If there is no rack, the rack property will not be written to the wire.

Changes to TopicMetadataResponse

TopicMetadataResponse will be updated to include rack information

Code Block
languagescala
case class TopicMetadataResponse(brokers: Seq[Broker], // change from Seq[BrokerEndPoint]
                                 topicsMetadata: Seq[TopicMetadata],
                                 correlationId: Int)

Changes to UpdateMetadataRequest

Consequently, Broker.writeTo will append rack at the end of ByteBuffer and Broker.readFrom will read it:

Code Block
Broker => ID NumberOfEndPoints [EndPoints] Rack
ID => int32
NumberOfEndPoints => int32
EndPoint => Port Host ProtocolID
Port => int32
Host => string
ProtocolID => int16
Rack => string
 

Same kind of string serialization (as how host is serialized) will be applied to rack, which means it will first write the size of the string as a short, followed by the actual string content. If rack is not available, it will write size -1 only without any actual string content. When reading from ByteBuffer, -1 will be interpreted as "null".

Changes to UpdateMetadataRequest

UpdateMetadataRequest understands version and the serialization for Broker are done in different ways for version 0 and version 1. For version 0, only broker ID, host and port will be serialized. For version 1, the complete Broker object will be serialized. This is done by calling Broker.writeTo and Broker.readFrom. Therefore, for version 1 the rack information will be automatically handled and the the serialization format is the same as the above. 

Proposed Changes

  • AdminUtils.assignReplicasToBrokers will be updated to create broker-rack mapping from ZooKeeper data before doing replica assignment. If none of the brokers have rack information, the algorithm will create the same assignment as the current implementation. If some brokers have rack, and some do not, the algorithm will thrown an exception. This is to prevent incorrect assignment caused by user error. 
  • When making the rack aware assignment, it tries to keep the following properties:
    • Even distribution of replicas among brokers
    • Even distribution of partition leadership among brokers
    • Assign to as many racks as possible. That means if the number of racks are more than or equal to the number of replicas, each rack will have at most one replica. On the other hand, if the number of racks is less than the the number of replicas (which should happen very infrequently), each rack should have at least one replica and no other guarantees are made on how the replicas will be distributed among racks. For example, if there are 2 racks and 4 replicas, one rack can have 3 replicas, 2 replicas or 1 replica. This is to keep the algorithm simple while still keeping other replica distribution properties and fault tolerance from the racks.
  • Implementation detail of the rack aware assignment (see more in the pull request https://github.com/apache/kafka/pull/132):
    • Before doing the rack aware assignment, sort the broker list such that they are interlaced according to the rack. In other words, adjacent brokers in the sorted list should not be in the same rack if possible . For example, assuming 6 brokers mapping to 3 racks: 0 -> "rack1", 1 -> "rack1", 2 -> "rack2", 3 -> "rack2", 4 -> "rack3", 5 -> "rack3", the sorted broker list could be (0, 2, 4, 1, 3, 5)
    • Apply the same assignment algorithm to assign replicas, with the addition of skipping a broker if its rack is already used for the same partition
  • UpdateMetadataRequest should be updated to correctly handle rack for both controller protocol version 0 and version 1.

...