Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-14352

...

Code Block
titleConsumerProtocolSubscription schema
{
  "type": "data",
  "name": "ConsumerProtocolSubscription",
  // Subscription part of the Consumer Protocol.
  //
  // The current implementation assumes that future versions will not break compatibility. When
  // it encounters a newer version, it parses it using the current format. This basically means
  // that new versions cannot remove or reorder any of the existing fields.
  //
  // Version 1 adds owned partitions.
  // Version 2 adds generationId (KIP-792).
  // Version 3 adds rack id to enable rack-aware assignment. <== NEW
  "validVersions": "0-2",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Topics", "type": "[]string", "versions": "0+" }, 
    { "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
      "default": "null", "zeroCopy": true }, 
    { "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+", "ignorable": true,
      "fields": [
        { "name": "Topic", "type": "string", "mapKey": true, "versions": "1+", "entityType": "topicName" },
        { "name": "Partitions", "type": "[]int32", "versions": "1+"}
      ]
    },
    { "name": "GenerationId", "type": "int32", "versions": "2+", "default": "-1"},
    { "name": "RackId", "type": "string", "versions": "2+", "nullableVersions": "2+", "default": "null", "ignorable": true } <== NEW
  ]
}


Rack id will be included in ConsumerPartitionAssignor.Subscription for each member's subscription metadata in ConsumerPartitionAssignor.GroupSubscription so that partition assignors can match the rack id of members with rack id of partition replicas.

Code Block
languagejava
titleChanges to ConsumerPartitionAssignor.Subscription
final class Subscription {
    ....
    private final Optional<String> rackId;

    public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions, int generationId, Optional<String> rackId) {
        this.topics = topics;
        this.userData = userData;
        this.ownedPartitions = ownedPartitions;
        this.rackIdgenerationId = rackIdgenerationId;
        this.rackId = rackId; 
        this.groupInstanceId = Optional.empty();
    }

    public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
        this(topics, userData, ownedPartitions, Optional.empty());
    }

    public Optional<String> rackId() {
        return rackId;
    }

    ....
}


We also propose to add rack-aware implementations of update range assignor and cooperative sticky assignor to use rack-aware algorithm if client.rack  is configured. Rack-aware assignors will match the racks of consumers and replicas on a best-effort basis and attempt to improve locality for consumer partition assignment.

...

The clusterMetadata instance used by partition assignors contains replica information for every partition, where each replica's rack is included in their Node if the broker was configured with broker.rack. This KIP also adds rack id for each member's Subscription  instance in GroupSubscription. So So a rack-aware partition assignor can match the rack id of the members with the rack id of the replicas to ensure that consumers are assigned partitions in the same rack if possible. In some cases, this may not be possible, for example, if there is a single consumer and one partition which doesn't have a replica in the same rack. In this case the partition is assigned with mismatched racks and will result in cross-rack traffic. The built-in assignor assignors will prioritize balancing partitions over improving locality, so in some cases, partitions may be allocated to a consumer in a different rack if there aren't sufficient partitions in the same rack as the consumer. The goal will be to improve locality for cases where load is uniformly distributed across a large number of partitions.

...

The new optional rack field for ConsumerProtocolSubscription will be added at the end of the structure, so it will not affect consumers with older versions. Existing consumer partition assignors will not use rack information, so their partition assignment will not be affected. New rack-aware partition assignor implementations will use rack information if available.

Consumers without rack ids and/or partitions with replicas without rack ids are assigned partitions using the non-rack-aware algorithm by all assignors, ensuring that consumers with different versions are supported. Range assignor and cooperative sticky assignor will use rack-aware algorithm if client.rack  is provided and both the client and brokers use version 3.4 and above. If either is at a lower version, existing non-rack-aware algorithm will be used. In the case where replication factor is greater than or equal to the number of racks, all racks will have replicas of all partitions and hence the assignors will retain very similar assignment as before. If replication factor is lower and client.rack  is provided, the updated assignors will use the new rack-aware logic, but there is no other compatibility impact.

Test Plan

  • Unit tests will be added for the protocol change and the new assignors.

  • Existing integration tests will be used to ensure that clients using existing assignors work with and without rack ids.

  • New integration tests will be added to verify rack-aware partition assignment.

  • Existing system tests will be used to verify compatibility with older versions.

...

Kafka Streams currently propagates tags in the userData  bytes field that is populated by its task assignor. We could do the same and populate rack only in a rack-aware partition assignor in the userData  field that is managed by assignors. Since client.rack is a standard consumer configuration option and is used for follower fetching, it seems better to include this at the top level rather than in assignor-specific userData  structure. This will allow any of the consumer partition assignors to take advantage of rack-based locality in future.

...

Implement new assignors

...

for rack-aware

...

assignment instead of updating existing assignors

Rack-awareness is not enabled by default in Kafka clients and brokers. For example, brokers use rack-aware replica selector for follower fetching, only if brokers are explicitly configured with replica.selector.class. It seems better to We could retain existing assignor algorithm for consumers rather than change them when existing rack configs client.rack  and broker.rack  are configured. So this KIP proposes to add new rack-aware assignors without changing the logic of existing assignorsassignors in consumers and implement new assignor classes for rack-aware assignment. But that requires consumers to be explicitly configured with new assignors to benefit from this KIP. Since consumers are configured with client.rack  only to benefit from locality with follower fetching, it seems reasonable to update existing assignors rather than require a configuration change. In scenarios where all racks have replicas of all partitions, we can retain the existing logic, so there will be no impact of this change in this case. In scenarios where clients have configured client.rack  to benefit from locality, but racks have a subset of replicas, it seems reasonable to make existing assignors rack-aware to benefit from improved locality without additional configuration changes.