You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka clusters are often distributed across multiple availability zones (AZ), especially in Cloud deployments. Rack-aware replica placement support in Kafka introduced in KIP-36 can be used to configure availability zones as racks for brokers. Replicas of each partition are distributed across different availability zones to ensure that a zonal outage doesn’t affect availability of a partition. KIP-392 added support for consumers to fetch from their closest replica. This feature enables consumers to fetch data from leaders or followers within the same availability zone, when possible, to benefit from locality.

Rack-aware Kafka deployments configure broker.rack for every broker based on their rack, which may be the AZ identifier. Brokers may also configure replica.selector.class that determines the preferred replica used to read data from. The default replica selector uses the leader as the preferred replica for reads, but the built-in RackAwareReplicaSelector  can be configured to match the rack of the replica with the rack of the consumer. Consumers using this feature benefit from locality and avoid expensive cross-AZ traffic by configuring client.rack. Rack ids are propagated to brokers in fetch requests and this enables the rack-aware replica selector to choose a replica on the same rack as the consumer and provide this information to the consumer to use in subsequent fetch requests . This feature works well in scenarios where every rack contains replicas of all partitions. For example, for a Kafka deployment with three AZs and replication factor 3, rack-aware replica placement places the three replicas on the 3 AZs. So every AZ has a replica for every partition. This enables consumers in any of the 3 AZs to consume from their local replica. If the number of replicas is lower than the number of AZs or racks, perhaps because number of AZs is higher than replication factor, some partitions may not have replicas on some AZs. Consumers running on those AZs will have to fetch data from the leader in another AZ. Rack-aware partition assignment for consumers will improve locality in this case.

KIP-848 describes the next generation consumer group protocol that fixes several issues with the existing protocol. This proposal already includes rack information in the protocol, making it easy to introduce rack-aware partition assignment in both the server-side partition assignors and client-side partition assignors proposed by that KIP. Since KIP-848 is a major rewrite of the consumer implementation, the existing consumer implementation is likely to be in widespread use for quite some time. Since the protocol change to support rack-aware partition assignment is a minor change, it will be good to support this feature in the existing consumer implementation as well so that it can be adopted sooner.

Public Interfaces

This KIP proposes to include rack information in ConsumerProtocolSubscription message. Rack will be populated by each consumer from the existing client.rack configuration. The new protocol including the new RackId  field will be:

ConsumerProtocolSubscription schema
{
  "type": "data",
  "name": "ConsumerProtocolSubscription",
  // Subscription part of the Consumer Protocol.
  //
  // The current implementation assumes that future versions will not break compatibility. When
  // it encounters a newer version, it parses it using the current format. This basically means
  // that new versions cannot remove or reorder any of the existing fields.
  //
  // Version 1 adds owned partitions.
  // Version 2 adds rack id to enable rack-aware assignment.
  "validVersions": "0-2",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Topics", "type": "[]string", "versions": "0+" }, 
    { "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
      "default": "null", "zeroCopy": true }, 
    { "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+", "ignorable": true,
      "fields": [
        { "name": "Topic", "type": "string", "mapKey": true, "versions": "1+", "entityType": "topicName" },
        { "name": "Partitions", "type": "[]int32", "versions": "1+"}
      ]
    },
    { "name": "RackId", "type": "string", "versions": "2+", "nullableVersions": "2+", "default": "null", "ignorable": true }
  ]
}


We also propose to add rack-aware implementations of range assignor and cooperative sticky assignor. Rack-aware assignors will match the racks of consumers and replicas on a best-effort basis and attempt to improve locality for consumer partition assignment.

Proposed Changes

For consumers which specify client.rack , the rack id be will added to ConsumerProtocolSubscription message using the protocol described above. This propagates the rack id of all members to the consumer that performs partition assignment.

Partition assignors already have access to cluster metadata when performing the assignment:

Existing partition assignor interface
GroupAssignment assign(Cluster clusterMetadata, GroupSubscription groupSubscription);


The clusterMetadata instance used by partition assignors contains replica information for every partition, where each replica's rack is included in their Node if the broker was configured with broker.rack. So a rack-aware partition assignor can match the rack id of the members with the rack id of the replicas to ensure that consumers are assigned partitions in the same rack if possible. In some cases, this may not be possible, for example, if there is a single consumer and one partition which doesn't have a replica in the same rack. In this case the partition is assigned with mismatched racks and will result in cross-rack traffic. The built-in assignor will prioritize balancing partitions over improving locality, so in some cases, partitions may be allocated to a consumer in a different rack if there aren't sufficient partitions in the same rack as the consumer. The goal will be to improve locality for cases where load is uniformly distributed across a large number of partitions.

Compatibility, Deprecation, and Migration Plan

The new optional rack field for ConsumerProtocolSubscription will be added at the end of the structure, so it will not affect consumers with older versions. Existing consumer partition assignors will not use rack information, so their partition assignment will not be affected. New rack-aware partition assignor implementations will use rack information if available. Consumers without rack ids and/or partitions with replicas without rack ids are assigned partitions using the non-rack-aware algorithm, ensuring that consumers with different versions are supported.

Test Plan

  • Unit tests will be added for the protocol change and the new assignors.

  • Existing integration tests will be used to ensure that clients using existing assignors work with and without rack ids.

  • New integration tests will be added to verify rack-aware partition assignment.

  • Existing system tests will be used to verify compatibility with older versions.

Rejected Alternatives

Use client tags similar to Kafka Streams instead of using rack configuration

Kafka Streams introduced rack-aware rack assignment in KIP-708. Flexible client tags were introduced to implement rack-awareness along with a rack-aware assignor for standby tasks. Tags are more flexible, but since we want to match existing rack configuration in the broker and consumers, it seems better to use rack id directly instead of adding prefixed tags. The next generation consumer group protocol proposed in KIP-848 also uses racks in the protocol.

Propagate rack in userData field

Kafka Streams currently propagates tags in the userData  bytes field that is populated by its task assignor. We could do the same and populate rack only in a rack-aware partition assignor in the userData  field that is managed by assignors. Since client.rack is a standard consumer configuration option and is used for follower fetching, it seems better to include this at the top level rather than in assignor-specific userData  structure. This will allow any of the consumer partition assignors to take advantage of rack-based locality in future.

Replace existing assignors with rack-aware assignor if racks are configured

Rack-awareness is not enabled by default in Kafka clients and brokers. For example, brokers use rack-aware replica selector for follower fetching, only if brokers are explicitly configured with replica.selector.class. It seems better to retain existing assignor algorithm for consumers rather than change them when existing rack configs client.rack  and broker.rack  are configured. So this KIP proposes to add new rack-aware assignors without changing the logic of existing assignors.



  • No labels