Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The new protocol’s RPCs are specified in the details in the public interfaces section of this document while the details of the rebalance logic is described in the next chapter.

Group Coordinator

TODO: Update this section to talk about the rewrite.

We propose to extend the current group coordinator to implement the new consumer group rebalance protocol. This chapter covers the various aspects related to the group coordinator.

...

Consumer Group
NameTypeDescription
Group IDuuidstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Group Epochint32The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group.
Members[]MemberThe set of members in the group.
Partitions Metadata[]PartitionMetadataThe metadata of the partitions that the group is subscribed to. This is used to trigger a rebalance when the partition metadata changes.
Member
NameTypeDescription
Member IDuuidstringThe unique identifier of the member. The ID is similar to an incarnation ID. It is generated by the client once and must be used during its lifetime.
Instance IDstringThe instance ID configured by the consumer.
Client IDstringThe client ID configured by the consumer.
Client HoststringThe client ID configured by the consumer.
Subscribed Topic Names[]stringThe current set of subscribed topic names configured by the consumer.
Subscribed Topic RegexstringThe current subscription regular expression configured by the consumer.
Server AssignorstringThe server side assignor used by the group.
Client Assignors[]AssignorThe list of client-side assignors supported by the member. The order of this list defined the priority.
Assignor
NameTypeDescription
NamestringThe unique name of the assignor.
Minimum Versionint32The minimum version of the metadata schema supported by this assignor.
Maximum Versionint32The maximum version of the metadata schema supported by this assignor.
Reasonint8The reason why the metadata was updated.
MetadatabytesThe metadata provided by the consumer for this assignor.

...

Target Assignment
NameTypeDescription
Group IDuuidstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Assignment Epochint32The epoch of the assignment. It represents the epoch of the group used to generate the assignment. It will eventually match the group epoch.
Assignment Error int8The error reported by the assignor.
Members[]MemberThe assignment for each member.
Member
NameTypeDescription
Member IDuuidstringThe unique identifier of the member.
PartitionsPartitionsThe set of partitions assigned to this member.
MetadatabytesThe metadata assigned to this member.

...

Current Assignment
NameTypeDescription
Group IDuuidstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Member IDuuidstringThe member ID of this member.
Epochint32

The current epoch of this member. The epoch is the assignment epoch of the assignment currently used by this member. This epoch is the one used to fence the member (e.g. offsets commit).

Error int8The error reported by the assignor.
PartitionsPartitionsThe current partitions used by the member.
MetadatabytesThe current metadata used by the member.

...

  1. The group coordinator revokes the partitions which are no longer in the target assignment of the member. It does so by providing the Current Partitions - & Target Partitions in the heartbeat response until the member acknowledges the revocation in the heartbeat response. The group coordinator will give the rebalance timeout to the member for the revocation process to complete or kick it out from the group otherwise.
  2. When the group coordinator receives the acknowledgement of the revocation, it updates the member current assignment to its target assignment (and target epoch) and durably persist it.
  3. The group coordinator assigns the new partitions to the member. It does so by providing the Target Partitions to the member while ensuring that partitions which are not revoked by other members yet are removed from this set. In other words, new partitions are incrementally assigned to the member when they are revoked by the other members.

...

Every member is uniquely identified by a UUID. This is is called the Member ID. This UUID is generated on the client server side and given to the member when it joins the group. It is used in all the communication with the group coordinator . The ID and must be kept during the entirely life span of the member (e.g. the consumer). In that sense, it is similar to an incarnation ID.

...

The member joins the group by sending an heartbeat with its no Member ID and a member epoch equals to 0. He can leaves the group by using a member epoch equals to -1.

...

Each member encodes the lag of its standby tasks in its metadata. We can not update the lag in every heartbeat request because that would constantly trigger reassignment in the group. Instead, when a) the task lag has been reduced within the acceptable.recovery.lag threshold or b) the task lag is consistently increasing for some time, the member should consider triggering a rebalance by sending its next heartbeat with the appropriate encoded reason and the updated task lags. 

Upgrade Path

TODO

Cluster Upgrade

  • Deploy the new version of the software
  • Roll the cluster
  • Enable the new protocol
  • Roll the cluster

Group Upgrade

Upgrading/Downgrading to/from the New Rebalance Protocol

Upgrading to the new protocol or downgrading from is possible by rolling the consumers, assuming that the new protocol is enabled on the server side, with the correct group.protocol. When the first consumer using the new rebalance protocol joins the group, the group is converted from a generic group to a consumer group and all the JoinGroup, SyncGroup and Heartbeat calls from the non-upgrades consumers are translated to the new protocol.

Protocols Interoperability

The new rebalance protocol relies mainly on one API, the ConsumerGroupHeartbeat API, whereas the old protocol relies on three APIs:

  • JoinGroup API - The consumer sends a JoinGroup request to join or re-join a group. The requests contains subscriptions, the owned partitions and the current generation id are provided (assuming KIP-792 is implemented here) serialized in the inner consumer protocol. The responses are sent back to the consumers only when all the members have (re-)joined the group. One of the member is elected as the leader of the group and is responsible for computing the assignment.
  • SyncGroup API - The consumer sends a SyncGroup request when it gets its JoinGroup response back in order to get its assignment.
  • Heartbeat API - The consumer heartbeats in order to maintain its session opened.

The old protocol has two modes:

  • Eager Mode - In the eager mode, the consumer revokes all its partitions before re-joining the group during a rebalance.
  • Cooperative Mode - In the cooperative mode, the consumer only revokes the partitions that it does not own anymore before rejoining the group. So two rebalances are required to move a partition from member A to member B. One rebalance to revoke the partition from A and another rebalance to assign the partition to B.

The two modes must be treater differently. At the moment, the group coordinator does not know which mode is used by the group. This information will be added to the protocol.

Eager Mode

Handling Heartbeat API

When a consumer group receives a heartbeat request, the group coordinator updates the session. If the generation id is equals to the target id, it replies with NONE; REBALANCE_IN_PROGRESS otherwise to signal that the member must rejoin the group.

Handling JoinGroup API

When a consumer joins or rejoins the consumer group, the group coordinator parses the inner consumer protocol in the JoinGroup request to extract the generation id, the subscribed topics and the user data. The generation id is considered as the current member epoch, the subscribed topics as the subscribed topics, and the user data as the assignor data with version -1. The consumer group is updated accordingly.

In the eager mode, the consumer revokes all its partitions before joining the group. Therefore, we don't have to do anything to revoke them. When all non-upgraded the members have rejoins the group, we know that the partitions are revoked so each member can advance its current assignment to its target assignment, and the group coordinator can release the SyncGroup responses when the changes are persisted. The SyncGroup contains the new member epoch as the generation id. The consumer will collect their assignment by sending the SyncGroup request.

Handling SyncGroup API

When a consumer send a SyncGroup request in the eager mode, the group coordinator validates the generation id and returns the current assignment of the members and serialize them according to the consumer protocol.

Cooperative Mode

Handling Heartbeat API

When a consumer group receives a heartbeat request, the group coordinator updates the session. If the generation id is equals to the target id, it replies with NONE; REBALANCE_IN_PROGRESS otherwise to signal that the member must rejoin the group.

Handling JoinGroup API

When a consumer joins or rejoins the consumer group, the group coordinator parses the inner consumer protocol in the JoinGroup request to extract the generation id, the subscribed topics and the user data. The generation id is considered as the current member epoch, the subscribed topics as the subscribed topics, and the user data as the assignor data with version -1.

In the cooperative mode, the consumer keeps its assigned partitions when joining the group. Therefore, two rebalances are required here as explained earlier. There are two cases to consider here:

  • If the owned partitions matches the Current Partitions & Target Partitions, it means that the consumer has revoked the partitions. In this case, the member is ready to advance to the next epoch.
  • If the owned partitions does not match Current Partitions & Target Partitions, the consumer must first be told to revoke them. The group coordinator can directly replies to him.


Therefore, we don't have to do anything to revoke them. When all non-upgraded the members have rejoins the group, we know that the partitions are revoked so each member can advance its current assignment to its target assignment, and the group coordinator can release the SyncGroup responses when the changes are persisted. The SyncGroup contains the new member epoch as the generation id. The consumer will collect their assignment by sending the SyncGroup request.

Handling SyncGroup API

When a consumer send a SyncGroup request in the eager mode, the group coordinator validates the generation id and returns the current assignment of the members and serialize them according to the consumer protocol.

Assignors Interoperability


Member ID?

  • String vs UUID? let's keep a string everywhere.
  • Deploy the new software
  • Roll the members
  • Enable the new protocol
  • Roll the members

Eager and Cooperative

  • Eager revokes all partitions before re-joining the group.
  • Cooperative does two rebalance rounds. One to revoke partitions. One to assign partitions.

...

  • When do we need to trigger a rebalance?
  • PrepareRebalance: current epoch < target epoch?
  • CompleteRebalance

Assignor

  • From client side to server side
    • The server side takes over
  • From client side to client side
    • The new assignor will get old metadata with -1 version
    • The new assignor must support old metadata

Public Interfaces

This section lists the changes impacting the public interfaces.

...

Compatibility, Deprecation, and Migration Plan

Cluster Upgrade

  • Deploy the new version of the software
  • Roll the cluster
  • Enable the new protocol
  • Roll the cluster

Group Upgrade

  • Deploy the new software
  • Roll the members
  • Enable the new protocol
  • Roll the members




  • Kafka 4.0 opt-in, consumer support.
  • Kafka 4.x opt-in, streams support.
  • Kafka 5.0, new protocol by default.

...