Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Eager Mode - In the eager mode, the consumer revokes all its partitions before re-joining the group during a rebalance.
  • Cooperative Mode - In the cooperative mode, the consumer only revokes the partitions that it does not own anymore before rejoining the group. So two rebalances are required to move a partition from member A to member B. One rebalance to revoke the partition from A and another rebalance to assign the partition to B.

The two modes must be treater treated differently. At the moment, the group coordinator does not know which mode is used by the group. This information will be added to the protocol.

Eager Mode

Handling Heartbeat API

Note that the upgrade path will only work from the consumer protocol version 3 (as described in KIP-792).

Heartbeat Handling

The Heartbeat request updates the member session and is used to trigger a rebalance by returning When a consumer group receives a heartbeat request, the group coordinator updates the session. If the generation id is equals to the target id, it replies with NONE; REBALANCE_IN_PROGRESS otherwise to signal that the member must rejoin the group.

Handling JoinGroup API

if the generation id is smaller than the target assignment epoch.

JoinGroup Handling

The JoinGroup request updates the group and the member state. For thisWhen a consumer joins or rejoins the consumer group, the group coordinator parses the consumer inner consumer protocol in the JoinGroup request to extract the generation id, the subscribed topics and the user dataand maps the information provided to the new data model. The generation id is considered as the current member epoch, the subscribed topics as the subscribed topics, and the user data as the assignor data with version -1. The consumer group is updated accordingly.

In the eager mode, the consumer revokes all its partitions before joining the group. Therefore, we don't have to do anything to revoke them. When all non-upgraded the members have rejoins the group, we know that the partitions are revoked so each member can advance its current assignment to its target assignment, and the group coordinator can release the SyncGroup responses when the changes are persisted. The SyncGroup contains the new member epoch as the generation id. The consumer will collect their assignment by sending the SyncGroup request.

Handling SyncGroup API

When a consumer send a SyncGroup request in the eager mode, the group coordinator validates the generation id and returns the current assignment of the members and serialize them according to the consumer protocol.

Cooperative Mode

Handling Heartbeat API

When a consumer group receives a heartbeat request, the group coordinator updates the session. If the generation id is equals to the target id, it replies with NONE; REBALANCE_IN_PROGRESS otherwise to signal that the member must rejoin the group.

Handling JoinGroup API

When a consumer joins or rejoins the consumer group, the group coordinator parses the inner consumer protocol in the JoinGroup request to extract the generation id, the subscribed topics and the user data. The generation id is considered as the current member epoch, the subscribed topics as the subscribed topics, and the user data as the assignor data with version -1.

In the cooperative mode, the consumer keeps its assigned partitions when joining the group. Therefore, two rebalances are required here as explained earlier. There are two cases to consider here:

  • If the owned partitions matches the Current Partitions & Target Partitions, it means that the consumer has revoked the partitions. In this case, the member is ready to advance to the next epoch.
  • If the owned partitions does not match Current Partitions & Target Partitions, the consumer must first be told to revoke them. The group coordinator can directly replies to him.

Therefore, we don't have to do anything to revoke them. When all non-upgraded the members have rejoins the group, we know that the partitions are revoked so each member can advance its current assignment to its target assignment, and the group coordinator can release the SyncGroup responses when the changes are persisted. The SyncGroup contains the new member epoch as the generation id. The consumer will collect their assignment by sending the SyncGroup request.

Handling SyncGroup API

When a consumer send a SyncGroup request in the eager mode, the group coordinator validates the generation id and returns the current assignment of the members and serialize them according to the consumer protocol.

Assignors Interoperability

Member ID?

  • String vs UUID? let's keep a string everywhere.

Eager and Cooperative

  • Eager revokes all partitions before re-joining the group.
  • Cooperative does two rebalance rounds. One to revoke partitions. One to assign partitions.

JoinGroup API

  • topics → subscribed topics ids
  • owned partitions → owned partitions (id based)
  • user data → assignor metadata

SyncGroup API

  • ...

Heartbeat API

  • When do we need to trigger a rebalance?
  • PrepareRebalance: current epoch < target epoch?
  • CompleteRebalance

Assignor

...

  • The server side takes over

rejoining the group so when all the newly assigned partitions are free to be assigned, the group coordinator updates the current assignment of the member to the target assignment and then sends the JoinGroup response to the consumer with the generation id set to the new member epoch. The consumer will then collects its assignment by calling the SyncGroup API.

In the cooperative mode, the consumer rejoins without revoking any partitions. If the member must revoke partition, the group coordinator replies immediately with the current member epoch. The consumer will then collects its assignment by calling the SyncGroup API. If the member does not have to revoke partitions, either because it just did or because it does not have too, the group coordinator waits until all the newly assigned partitions are free. Then it updates the current assignment of the member to the target assignment and sends the JoinGroup response to the consumer with the generation id set to the new member epoch. The consumer will then collects its assignment by calling the SyncGroup API.

SyncGroup Handling

The SyncGroup request collects the assignment of the member. The current assignment is defined as the intersection of the Current Partitions and Target Partitions. This removes the partitions to be revokes from the set. The group coordinator has to map the data model of the new protocol to the consumer protocol schema.

Assignors Interoperability

When upgrading from a client side assignor to a server side assignor, the server sides takes over immediately. The client side assignor is never used any more.

When upgrading from a client side assignor with the old protocol to a client side assignor with the new protocol, the new assignor must be able to understand the metadata serialised by the old assignor. Those metadata will have their version set to -1. This indicates that the assignor must get the version from the first metadata bytes. The new assignor has to serialise the metadata using the same version. The group coordinator will refuse a member using the new protocol to join the group if its client side assignor does not support version -1 but only if the old assignor uses metadata. 

...

Public Interfaces

This section lists the changes impacting the public interfaces.

...