Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Each member encodes the lag of its standby tasks in its metadata. We can not update the lag in every heartbeat request because that would constantly trigger reassignment in the group. Instead, when a) the task lag has been reduced within the acceptable.recovery.lag threshold or b) the task lag is consistently increasing for some time, the member should consider triggering a rebalance by sending its next heartbeat with the appropriate encoded reason and the updated task lags. 

...

Supporting Online Consumer Group Upgrade

Upgrading to the new protocol or downgrading from is possible by rolling the consumers, assuming that the new protocol is enabled on the server side, with the correct group.protocol. When the first consumer using the new rebalance protocol joins the group, the group is converted from a generic group to a consumer group and all the JoinGroup, SyncGroup and Heartbeat calls from the non-upgrades upgraded consumers are translated to the new protocol.

Protocols Interoperability

The new rebalance protocol relies mainly on one API, the ConsumerGroupHeartbeat API, whereas the old protocol relies on three APIs:

...

How? The idea is to basically reconcile each member individually with the old protocol APIs. Note that the group epoch starts at the current group generation.

Before explaining how that will work, let's recapitulate how the current protocol workflow. First, the members join or re-join

...

the group with the JoinGroup API. The

...

JoinGroup request contains the subscriptions, the owned partitions

...

, etc. When all the members are there, the group coordinator picks a leader for the group and sends back the JoinGroup response to all the members. The leader is responsible for computing the assignment.

...

Second, all the members collects their assignment - computed by the leader - by using the SyncGroup API. In parallel, the members heartbeat with the Heartbeat API in order to maintain

...

The old protocol has two modes:

...

their session. The Heartbeat API is also used by the group coordinator to inform the members about an ongoing rebalance. All those interactions are synchronized on the generation of the group. It is important to note that the consumer does not make any assumption about the generation id. It basically uses what it receives from the group coordinator. At least, this is how the Java client works. The current rebalance protocol supports two modes: Eager and Cooperative. In the eager mode, the consumer revokes all its partitions before

...

rejoining the group during a rebalance.

...

In the cooperative mode, the consumer

...

does not revoke any partitions before rejoining the group. However, it revokes the partitions that it does not own anymore

...

when it receives it new assignment and rejoins immediately if he had to revoke any partitions.

The new protocol relies on the ConsumerGroupHeartbeat API to do all the above. Concretely, the API updates the group state, provides the partitions owned by the consumer, gets back the assignment, and updates the session. We can remap those to the old protocol as follow: the JoinGroup API updates the group state and provides the partitions owned, the SyncGroup API gets back the assignment, and the Heartbeat API updates the session. The main difference here is that the JoinGroup and SyncGroup does not run continuously. The group coordinator has to trigger it when it is need by returning the REBALANCE_IN_PROGRESS error in the heartbeat response.

The APIs will work as follow during the migration of a group.

Let's start by defining a state machine for each member:

  • PrepareRebalance - The member enters this state when a rebalance is required. The member must rejoin within the rebalance timeout or the coordinator kicks it out from the group. We will discuss later when a rebalance is required.
  • CompletingRebalance - The member enters this state when the join group has been received. The member must sync within the rebalance timeout or the coordinator kicks it out from the group.
  • Stable - The member enters this state when it has completed the rebalance loop.

We use the terminology already use in the current protocol intentionally here in order to facilitate the reasoning.

JoinGroup Handling

The JoinGroup API is handled as follow:

  • The basic validations are applied. The request is rejected with the appropriate error if the validation fails.
    • The group must exist.
    • The member must exist or be created.
    • The embedded protocol must be on version >= 3.
    • The generation id must match the current member epoch. The generation id is provided in the embedded protocol.
  • The group state is updated if needed.
    • The group epoch is incremented if a new assignment is required.
    • The Topics provided in the embedded consumer protocol are used to update the SubscribedTopicNames.
    • The Protocols are converted to Assignors where the UserData become the metadata and the version is set to -1.
  • The current member assignment is updated if needed:
    • If the member has revoked all its partitions or the required partitions, the member can transition to its next epoch. The current assignment become the current assignment. The group coordinator replies with the new member epoch as the generation id.
    • If the member has to revoke partitions, the group coordinator replies with the current member epoch as the generation id.
  • The member transitions to CompletingRebalance state.

SyncGroup API

The SyncGroup API is handled as follow:

  • The basic validations are applied.
    • The group must exist.
    • The member must exist.
    • The generation id must match the current member epoch. The generation id is provided in the embedded protocol.
  • The group coordinator replies with the current assignment. There are two cases to consider here:
    • If the member epoch is smaller than the target epoch, the coordinator replies with the intersection between the current assignment and target assignment. This is used to revoke the partitions not owned anymore. In this case, we know that the member will rejoins the group if it revokes partitions. That will automatically trigger another rebalance to collect the newly assigned partitions.
    • If the member epoch is equals to the target epoch, the coordinator replies with the current assignment - the partitions not revoked by other members yet.
  • The group coordinator serializes the assignment with the embedded consumer protocol.
  • The member transitions to Stable state.

Heartbeat API

The Heartbeat API is handled as follow:

  • The basic validations are applied.
    • The group must exist.
    • The member must exist.
    • The generation id must match the current member epoch. The generation id is provided in the embedded protocol.
  • The member session is updated.
  • The group coordinator replies with REBALANCE_IN_PROGRESS if the member is in the PrepareRebalance; NONE is used otherwise.

Rebalance Triggers

The group coordinator will trigger a rebalance in the following cases:

  • A new assignment is installed. In this case, all non-upgraded members must be rebalanced.
  • A member is rebalanced when all its newly assigned partitions have been revoked by other members.

Note that the upgrade path will only work from the consumer protocol version 3 (as described in KIP-792).

Heartbeat Handling

The Heartbeat request updates the member session and is used to trigger a rebalance by returning REBALANCE_IN_PROGRESS if the generation id is smaller than the target assignment epoch.

JoinGroup Handling

The JoinGroup request updates the group and the member state. For this, the group coordinator parses the consumer inner protocol and maps the information provided to the new data model. The generation id is considered as the current member epoch, the subscribed topics as the subscribed topics, and the user data as the assignor data with version -1.

In the eager mode, the consumer revokes all its partitions before rejoining the group so when all the newly assigned partitions are free to be assigned, the group coordinator updates the current assignment of the member to the target assignment and then sends the JoinGroup response to the consumer with the generation id set to the new member epoch. The consumer will then collects its assignment by calling the SyncGroup API.

In the cooperative mode, the consumer rejoins without revoking any partitions. If the member must revoke partition, the group coordinator replies immediately with the current member epoch. The consumer will then collects its assignment by calling the SyncGroup API. If the member does not have to revoke partitions, either because it just did or because it does not have too, the group coordinator waits until all the newly assigned partitions are free. Then it updates the current assignment of the member to the target assignment and sends the JoinGroup response to the consumer with the generation id set to the new member epoch. The consumer will then collects its assignment by calling the SyncGroup API.

SyncGroup Handling

The SyncGroup request collects the assignment of the member. The current assignment is defined as the intersection of the Current Partitions and Target Partitions. This removes the partitions to be revokes from the set. The group coordinator has to map the data model of the new protocol to the consumer protocol schema.

Assignors Interoperability

When upgrading from a client side assignor to a server side assignor, the server sides takes over immediately. The client side assignor is never used any more.

...

Public Interfaces

This section lists the changes impacting the public interfaces.

...

Upgrading consumers can be done online. First, all the consumers must be upgraded to a software version which supports the online migration. The version is not defined yet but the consumer will need to support the consumer embedded protocol version 3 (introduced in KIP-792). There are also other considerations if a custom client side assignor is used. Second, the consumer must be rolled to enable the new rebalance protocol. This is done by setting group.protocol to consumer. group.remote.assignor may need to be adjusted as well. When a consumer joins with the new protocol, the group is automatically converted from a generic group to a consumer group if the upgrade requirements are met (e.g. consumer embedded protocol version >=3). If they are not, the consumer is rejected with an INVALID_REQUEST error. During the upgrade process, consumers will continue to use the old rebalance APIs. The group coordinator will translate the JoinGroup, SyncGroup and Heartbeat API to the new rebalance protocol. This translation is an implementation detail but it is explained earlier in this document, see the Supporting Online Consumer Group Upgrade chapter. If a customer client side assignor or regex based subscriptions are used, please pay attention to the details provided in the following sub-chapters.

...