...
The new protocol’s RPCs are specified in the details in the public interfaces section of this document while the details of the rebalance logic is described in the next chapter.
Group Coordinator
TODO: Update this section to talk about the rewrite.
We propose to extend the current group coordinator to implement the new consumer group rebalance protocol. This chapter covers the various aspects related to the group coordinator.
...
Consumer Group | ||
---|---|---|
Name | Type | Description |
Group ID | uuidstring | The group ID as configured by the consumer. The ID uniquely identifies the group. |
Group Epoch | int32 | The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group. |
Members | []Member | The set of members in the group. |
Partitions Metadata | []PartitionMetadata | The metadata of the partitions that the group is subscribed to. This is used to trigger a rebalance when the partition metadata changes. |
Member | ||
Name | Type | Description |
Member ID | uuidstring | The unique identifier of the member. The ID is similar to an incarnation ID. It is generated by the client once and must be used during its lifetime. |
Instance ID | string | The instance ID configured by the consumer. |
Client ID | string | The client ID configured by the consumer. |
Client Host | string | The client ID configured by the consumer. |
Subscribed Topic Names | []string | The current set of subscribed topic names configured by the consumer. |
Subscribed Topic Regex | string | The current subscription regular expression configured by the consumer. |
Server Assignor | string | The server side assignor used by the group. |
Client Assignors | []Assignor | The list of client-side assignors supported by the member. The order of this list defined the priority. |
Assignor | ||
Name | Type | Description |
Name | string | The unique name of the assignor. |
Minimum Version | int32 | The minimum version of the metadata schema supported by this assignor. |
Maximum Version | int32 | The maximum version of the metadata schema supported by this assignor. |
Reason | int8 | The reason why the metadata was updated. |
Metadata | bytes | The metadata provided by the consumer for this assignor. |
...
Target Assignment | ||
---|---|---|
Name | Type | Description |
Group ID | uuidstring | The group ID as configured by the consumer. The ID uniquely identifies the group. |
Assignment Epoch | int32 | The epoch of the assignment. It represents the epoch of the group used to generate the assignment. It will eventually match the group epoch. |
Assignment Error | int8 | The error reported by the assignor. |
Members | []Member | The assignment for each member. |
Member | ||
Name | Type | Description |
Member ID | uuidstring | The unique identifier of the member. |
Partitions | Partitions | The set of partitions assigned to this member. |
Metadata | bytes | The metadata assigned to this member. |
...
Current Assignment | ||
---|---|---|
Name | Type | Description |
Group ID | uuidstring | The group ID as configured by the consumer. The ID uniquely identifies the group. |
Member ID | uuidstring | The member ID of this member. |
Epoch | int32 | The current epoch of this member. The epoch is the assignment epoch of the assignment currently used by this member. This epoch is the one used to fence the member (e.g. offsets commit). |
Error | int8 | The error reported by the assignor. |
Partitions | Partitions | The current partitions used by the member. |
Metadata | bytes | The current metadata used by the member. |
...
- The group coordinator revokes the partitions which are no longer in the target assignment of the member. It does so by providing the Current Partitions - & Target Partitions in the heartbeat response until the member acknowledges the revocation in the heartbeat response. The group coordinator will give the rebalance timeout to the member for the revocation process to complete or kick it out from the group otherwise.
- When the group coordinator receives the acknowledgement of the revocation, it updates the member current assignment to its target assignment (and target epoch) and durably persist it.
- The group coordinator assigns the new partitions to the member. It does so by providing the Target Partitions to the member while ensuring that partitions which are not revoked by other members yet are removed from this set. In other words, new partitions are incrementally assigned to the member when they are revoked by the other members.
...
Every member is uniquely identified by a UUID. This is is called the Member ID. This UUID is generated on the client server side and given to the member when it joins the group. It is used in all the communication with the group coordinator . The ID and must be kept during the entirely life span of the member (e.g. the consumer). In that sense, it is similar to an incarnation ID.
...
The member joins the group by sending an heartbeat with its no Member ID and a member epoch equals to 0. He can leaves the group by using a member epoch equals to -1.
...
Each member encodes the lag of its standby tasks in its metadata. We can not update the lag in every heartbeat request because that would constantly trigger reassignment in the group. Instead, when a) the task lag has been reduced within the acceptable.recovery.lag threshold or b) the task lag is consistently increasing for some time, the member should consider triggering a rebalance by sending its next heartbeat with the appropriate encoded reason and the updated task lags.
Upgrade Path
TODO
Cluster Upgrade
- Deploy the new version of the software
- Roll the cluster
- Enable the new protocol
- Roll the cluster
Group Upgrade
Upgrading/Downgrading to/from the New Rebalance Protocol
Upgrading to the new protocol or downgrading from is possible by rolling the consumers, assuming that the new protocol is enabled on the server side, with the correct group.protocol. When the first consumer using the new rebalance protocol joins the group, the group is converted from a generic group to a consumer group and all the JoinGroup, SyncGroup and Heartbeat calls from the non-upgrades consumers are translated to the new protocol.
Protocols Interoperability
The new rebalance protocol relies mainly on one API, the ConsumerGroupHeartbeat API, whereas the old protocol relies on three APIs:
- JoinGroup API - The consumer sends a JoinGroup request to join or re-join a group. The requests contains subscriptions, the owned partitions and the current generation id are provided (assuming KIP-792 is implemented here) serialized in the inner consumer protocol. The responses are sent back to the consumers only when all the members have (re-)joined the group. One of the member is elected as the leader of the group and is responsible for computing the assignment.
- SyncGroup API - The consumer sends a SyncGroup request when it gets its JoinGroup response back in order to get its assignment.
- Heartbeat API - The consumer heartbeats in order to maintain its session opened.
The old protocol has two modes:
- Eager Mode - In the eager mode, the consumer revokes all its partitions before re-joining the group during a rebalance.
- Cooperative Mode - In the cooperative mode, the consumer only revokes the partitions that it does not own anymore before rejoining the group. So two rebalances are required to move a partition from member A to member B. One rebalance to revoke the partition from A and another rebalance to assign the partition to B.
The two modes must be treater differently. At the moment, the group coordinator does not know which mode is used by the group. This information will be added to the protocol.
Eager Mode
Handling Heartbeat API
When a consumer group receives a heartbeat request, the group coordinator updates the session. If the generation id is equals to the target id, it replies with NONE; REBALANCE_IN_PROGRESS otherwise to signal that the member must rejoin the group.
Handling JoinGroup API
When a consumer joins or rejoins the consumer group, the group coordinator parses the inner consumer protocol in the JoinGroup request to extract the generation id, the subscribed topics and the user data. The generation id is considered as the current member epoch, the subscribed topics as the subscribed topics, and the user data as the assignor data with version -1. The consumer group is updated accordingly.
In the eager mode, the consumer revokes all its partitions before joining the group. Therefore, we don't have to do anything to revoke them. When all non-upgraded the members have rejoins the group, we know that the partitions are revoked so each member can advance its current assignment to its target assignment, and the group coordinator can release the SyncGroup responses when the changes are persisted. The SyncGroup contains the new member epoch as the generation id. The consumer will collect their assignment by sending the SyncGroup request.
Handling SyncGroup API
When a consumer send a SyncGroup request in the eager mode, the group coordinator validates the generation id and returns the current assignment of the members and serialize them according to the consumer protocol.
Cooperative Mode
Handling Heartbeat API
When a consumer group receives a heartbeat request, the group coordinator updates the session. If the generation id is equals to the target id, it replies with NONE; REBALANCE_IN_PROGRESS otherwise to signal that the member must rejoin the group.
Handling JoinGroup API
When a consumer joins or rejoins the consumer group, the group coordinator parses the inner consumer protocol in the JoinGroup request to extract the generation id, the subscribed topics and the user data. The generation id is considered as the current member epoch, the subscribed topics as the subscribed topics, and the user data as the assignor data with version -1.
In the cooperative mode, the consumer keeps its assigned partitions when joining the group. Therefore, two rebalances are required here as explained earlier. There are two cases to consider here:
- If the owned partitions matches the Current Partitions & Target Partitions, it means that the consumer has revoked the partitions. In this case, the member is ready to advance to the next epoch.
- If the owned partitions does not match Current Partitions & Target Partitions, the consumer must first be told to revoke them. The group coordinator can directly replies to him.
Therefore, we don't have to do anything to revoke them. When all non-upgraded the members have rejoins the group, we know that the partitions are revoked so each member can advance its current assignment to its target assignment, and the group coordinator can release the SyncGroup responses when the changes are persisted. The SyncGroup contains the new member epoch as the generation id. The consumer will collect their assignment by sending the SyncGroup request.
Handling SyncGroup API
When a consumer send a SyncGroup request in the eager mode, the group coordinator validates the generation id and returns the current assignment of the members and serialize them according to the consumer protocol.
Assignors Interoperability
Member ID?
- String vs UUID? let's keep a string everywhere.
- Deploy the new software
- Roll the members
- Enable the new protocol
- Roll the members
Eager and Cooperative
- Eager revokes all partitions before re-joining the group.
- Cooperative does two rebalance rounds. One to revoke partitions. One to assign partitions.
...
- When do we need to trigger a rebalance?
- PrepareRebalance: current epoch < target epoch?
- CompleteRebalance
Assignor
- From client side to server side
- The server side takes over
- From client side to client side
- The new assignor will get old metadata with -1 version
- The new assignor must support old metadata
Public Interfaces
This section lists the changes impacting the public interfaces.
...
Compatibility, Deprecation, and Migration Plan
Cluster Upgrade
- Deploy the new version of the software
- Roll the cluster
- Enable the new protocol
- Roll the cluster
Group Upgrade
- Deploy the new software
- Roll the members
- Enable the new protocol
- Roll the members
- Kafka 4.0 opt-in, consumer support.
- Kafka 4.x opt-in, streams support.
- Kafka 5.0, new protocol by default.
...