Status

Current state: Accepted

Discussion thread: here

JIRA:

Github Pull Request: https://github.com/apache/kafka/pull/132

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Machines in data center are sometimes grouped in racks. Racks provide isolation as each rack may be in a different physical location and has its own power source. When resources are properly replicated across racks, it provides fault tolerance in that if a rack goes down,  the remaining racks can continue to serve traffic.

In Kafka, if there are more than one replica for a partition, it would be nice to have replicas placed in as many different racks as possible so that the partition can continue to function if a rack goes down. In addition, it makes maintenance of  Kafka cluster easier as you can take down the whole rack at a time.

In AWS, racks are usually mapped to the concept of availability zones

Public Interfaces

Changes to Broker property

An optional broker property will be added 

case class Broker(id: Int, endPoints: Map[SecurityProtocol, EndPoint], rack: Option[String])

rack will be an optional field of version 3 JSON schema for a broker. The API ZkUtils.registerBrokerInZk will be updated to increment the JSON version.

For example:

 

{"version":3,
  "host","localhost",
  "port",9092
  "jmx_port":9999,
  "timestamp":"2233345666",
  "endpoints": ["PLAINTEXT://host1:9092",
                "SSL://host1:9093"],
  "rack": "dc1"
}

If no rack information is specified, the field will not be included in JSON.

Rack can be specified in the broker property file as

broker.rack=<rack ID as string>

For example, 

broker.rack=dc1

broker.rack=us-east-1c

Consequently, Broker.writeTo will append rack at the end of ByteBuffer and Broker.readFrom will read it:

Broker => ID NumberOfEndPoints [EndPoint] Rack
ID => int32
NumberOfEndPoints => int32
EndPoint => Port Host ProtocolID
Port => int32
Host => string
ProtocolID => int16
Rack => string
 

Same kind of string serialization (as how host is serialized) will be applied to rack, which means it will first write the size of the string as a short, followed by the actual string content. If rack is not available, it will write size -1 only without any actual string content. When reading from ByteBuffer, -1 will be interpreted as "null".

Changes to UpdateMetadataRequest

The version will be incremented to 2 from 1 and rack will be included. For version 0, only broker ID, host and port will be serialized. For version 1 and 2, the complete Broker object will be serialized. This is done by calling Broker.writeTo and Broker.readFrom. Therefore, for version 2 the rack information will be automatically handled and the the serialization format is the same as the above.

The complete new format of UpdateMetadataRequest is the following:

VersionID CorrelationID ClientID Controller_ID Controller_Epoch Partition_States Brokers
Brokers => NumberOfBroker [Broker]
NumberOfBroker => Int32
Broker => ID NumberOfEndPoints [EndPoint] Rack // Rack is added in version 2
ID => int32
NumberOfEndPoints => int32
EndPoint => Port Host ProtocolID
Port => int32
Host => string
ProtocolID => int16
Rack => string
 

Changes to TopicMetadataRequest and TopicMetadataResponse

TopicMetadataRequest will increment its version from 0 to 1. If Kafka receives TopicMetadataRequest with version 1, it will create TopicMetadataResponse with rack as part of BrokerEndPoint. Here is the new format for TopicMetadataResponse:

CorrelationID BrokerEndPoints TopicsMetaData
BrokerEndPoints => NumberOfBrokers [BrokerEndPoint]
BrokerEndPoint => BrokerID Host Port Rack // Rack is added
BrokerID => int32
Host => string
Port => int32
Rack => string

 

Proposed Changes

Compatibility, Deprecation, and Migration Plan