Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

zkClient is passed into the interface in case the implementation stores the rack information in under the same ZooKeeper path as the Kafka cluster or needs to access cluster meta data information from the ZooKeeper. Implementation can use the additional properties passed into the interface to initialize and access the storage where the rack information is stored. The single method getRackInfo() returns a map that maps a broker ID to the rack ID represented as a string.

The following optional properties can be used when Kafka broker startsare added to broker configuration:

  • rack.locator.class: The implementation class 

 

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • class name
  • rack.locator.properties: Comma delimited key=value pairs as properties to pass into the RackLocator, e.g, "username=foo,password=bar"

The following optional command line arguments are added to TopicCommand and ReassignPartitionsCommand which are equivalent to the above properties used by broker at runtime:

 

  •  rack-locator-classs 
  • rack-locator-properties

Proposed Changes

  • AdminUtils.assignReplicasToBrokers is updated to create rack aware assignment based on the broker to rack mapping passed into it. If the map is empty, it will treat it as no rack information available and produce the same assignment as the current implementation. When making the rack aware assignment, it tries to keep the following properties:
    • Even distribution of replicas among brokers
    • Even distribution of partition leadership among brokers
    • Assign to as many racks as possible. That means if the number of racks are more than or equal to the number of replicas, each rack will have at most one replica. On the other hand, if the number of racks is less than the the number of replicas (which should happen rarely), each rack should have at least one replica and no other guarantees are made on how the replicas will be distributed among racks. For example, if there are 2 racks and 4 replicas, one rack can have 3 replicas, 2 replicas or 1 replica. This is to keep the algorithm simple while still keeping other replica distribution properties and fault tolerance from the racks.
  • Here is the implementation detail of the rack aware assignment:
    • Before doing the rack aware assignment, sort the broker list such that they are interlaced according to the rack. In other words, adjacent brokers in the sorted list should not be in the same rack if possible . For example, assuming 6 brokers mapping to 3 racks: 0 -> "rack1", 1 -> "rack1", 2 -> "rack2", 3 -> "rack2", 4 -> "rack3", 5 -> "rack3", the sorted broker list could be (0, 2, 4, 1, 3, 5)
    • Apply the same assignment algorithm to assign replicas, with the addition of skipping a broker if its rack is already used for the same partition
  • KafkaApis will initialize the RackLocator if configured and call the new AdminUtils.assignReplicasToBrokers API with the broker to rack mapping obtained from RackLocator. This will ensure a rack aware assignment when auto topic creation is enabled.
  • TopicCommand and ReassignPartitionsCommand will initialize the RackLocator (if enabled from command line) and call the new AdminUtils.assignReplicasToBrokers API with the broker to rack mapping obtained from RackLocator.
  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.