
Current stateUnder Discussion

Discussion thread: here


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Similar to many distributed systems, Kafka Streams instances can also be grouped in different racks. When Kafka Stream's standby task is properly distributed in different rack compared to the corresponding active task, it provides fault tolerance and faster recovery time if the rack of the active task goes down.

Below we will explore how other distributed systems implement rack awareness and what kind of guarantees they aim to provide. 


Rack awareness in Elasticsearch works by defining a list of tags/attributes, called awareness attributes to each node in the cluster. When Elasticsearch knows the nodes' rack specification, it distributes the primary shard and its replica shards to minimize the risk of losing all shard copies in the event of a failure. Besides defining an arbitrary list of tags/attributes for each node, Elasticsearch provides a means of setting which tags/attributes it must consider when balancing the shards across the racks.


node.attr.rack_id: rack_one
node.attr.cluster_id: cluster_one
cluster.routing.allocation.awareness.attributes: rack_id,cluster_id

Besides, Elasticsearch provides "Forced awareness" configuration, a safeguard to prevent racks from being overloaded in case of a failure. By default, if one location fails, Elasticsearch assigns all of the missing replica shards to the remaining locations. In the case of limited resources, a single rack might be unable to host all of the shards. cluster.routing.allocation.awareness.attributes configuration can be used to prevent Elasticsearch from allocating replicas until nodes are available in another location.


cluster.routing.allocation.awareness.attributes: rack_id zone1,zone2

In the example above, if we start two nodes with set to zone1 and create an index with five shards and one replica, Elasticsearch creates the index and allocates the five primary shards but no replicas. Replicas are only allocated once nodes with set to zone2 is available. 


In the case of Hadoop, rack is a physical collection of nodes in the cluster, and it's the mean of fault tolerance, as well as optimization. The idea in Hadoop is that read/write operation in the same rack is cheaper compared to when the process spans across multiple racks. With the rack information, Namenode chooses the closest Datanode while performing the read/write operation, which reduces network traffic.

A rack can have multiple data nodes storing the file blocks and replicas. Hadoop cluster with a replication factor of 3 will automatically write a particular file block in 2 different Datanodes in the same rack, plus in a different rack for redundancy. 

Rack awareness in the Hadoop cluster has to comply with the following policies:


Rack "awareness" in Redis is called "Rack-zone awareness" and it's very similar to Kafka Broker's rack awareness. Rack-zone awareness only works in a clustered Redis deployment, and it's an enterprise feature.

Rack-zone awareness works by assigning a rack-zone ID to each node. This ID is used to map the node to a physical rack or logical zone (AWS availability zone, for instance). When appropriate IDs are set, cluster ensures that leader shards, corresponding replica shards, and associated endpoints are placed on nodes in different racks/zones.

In the event of a rack failure, the remaining racks' replicas and endpoints will be promoted. This approach ensures high availability when a rack or zone fails.

Proposed Changes

This KIP proposes to implement similar semantics in Kafka Streams as in Elasticsearch. Rack awareness semantics in Elasticsearch seems the most flexible and can cover more complex use-cases, such as multi-dimensional rack awareness. To achieve this, KIP proposes to introduce a new config prefix in StreamsConfig that will be used to retrieve user-defined instance tags of the Kafka Streams

 * Prefix used to add arbitrary tags to a Kafka Stream's instance as key-value pairs.
 * Example:
 * client.tag.cluster=cluster1
public static final String CLIENT_TAG_PREFIX = "client.tag.";

We will also add a new configuration option in StreamsConfig, which will be the means of setting which tags Kafka Streams must take into account when balancing the standby tasks across the racks.

public static final String TASK_ASSIGNMENT_RACK_AWARENESS_CONFIG = "task.assignment.rack.awareness";
public static final String TASK_ASSIGNMENT_RACK_AWARENESS_DOC = "List of client tag keys used to distribute standby replicas across Kafka Streams instances." +                                                                
                                                                " When configured, Kafka Streams will make a best effort to distribute" +
                    											" the standby tasks over each client tag dimension.";

When client.tag.* dimensions are configured, Kafka Streams will read this information from the configuration and encode it into SubscriptionInfoData as key-value pairs.

  "name": "SubscriptionInfoData",
  // version bump
  "validVersions": "1-10",
  "fields": [
      "name": "clientTags",
      "versions": "10+",
      "type": "[]ClientTag"
  "commonStructs": [
      "name": "ClientTag",
      "versions": "1+",
      "fields": [
          "name": "key",
          "versions": "1+",
          "type": "bytes"
          "name": "value",
          "versions": "1+",
          "type": "bytes"

Kafka Streams's Task Assignor will make a decision on how to distribute standby tasks over the clients based on received clientTags with the subscription info and configured task.assignment.rack.awareness configured

Standby task distribution algorithm is not specified in this KIP, but is left as an implementation detail. However, every distribution algorithm must handle gracefully when ideal standby task distribution is not possible; In that case, Kafka Streams must not fail the assignment but try to find the subsequent most optimal distribution. The ideal distribution means there is no repeated client dimension amongst clients assigned to the active task and all standby tasks.

Changes in HighAvailabilityTaskAssignor

Implementation of this KIP must not affect HighAvailabilityTaskAssignor in a breaking way, meaning that all the existing behavior should stay unchanged (e.g., when new configurations are not specified). Once required configurations are set, the main change should happen within the code that deals with standby task allocation, specifically:

HighAvailabilityTaskAssignor#assignStandbyReplicaTasks and HighAvailabilityTaskAssignor#assignStandbyTaskMovements

Compatibility, Deprecation, and Migration Plan

The changes proposed by this KIP shouldn't affect previously setup applications. Since we introduce new configuration options, existing ones shouldn't be affected by this change.

Rejected Alternatives