Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The group coordinator is responsible of the regex based subscriptions. We will use Google RE2/J to compile and to execute the regular expressions on the server side. This means that all clients, regardless of their language, will use this common regular expression syntax. This should not be an issue for any common regular expressions but may require changes if specifics from the language are used. 

...

MetadataVersion / IBP

We introduce a new feature flag named “group.new.coordinator.enable” instead of relying on the The new group coordinator will rely on the metadata.version or IBP to enable the new group coordinator and the consumer group protocol. This flag is particularly useful during the development phase as it allows us to enable the new group coordinator while keeping the other for production deployment. This also allows operators to explicitly enable it in the future.When the feature will be marked as production ready, we will replace the feature flag by using the IBP / Metadata Versionprotocol. The exact version will be determined when the feature is ready.

Persistence

We will introduce a new set of records to persist the new consumer group type in the existing __consumer_offsets topic. The records are detailed in the public interfaces section of this document.

...

Upgrading to the new protocol or downgrading from it is possible by rolling the consumers, assuming that the new protocol is enabled on the server side, with the correct group.protocol. When the first consumer using the new rebalance protocol joins the group, the group is converted from a generic group to a consumer group. When the last consumer group and using the new rebalance protocol leaves the group, the group is converted back to a generic group. Note that the group epoch starts at the current group generation. During the migration, all the JoinGroup, SyncGroup and Heartbeat calls from the non-upgraded consumers are translated to the new protocol. How? The idea is to basically reconcile each member individually with the old protocol APIs. Note that the group epoch starts at the current group generation. 

Before explaining how that will work, let's recapitulate how the current protocol workflow. First, the members join or re-join the group with the JoinGroup API. The JoinGroup request contains the subscriptions, the owned partitions, etc. When all the members are therehave joined, the group coordinator picks a leader for the group and sends back the JoinGroup response to all the members. The leader is responsible for computing the assignment. Second, all the members collects their assignment - computed by the leader - by using the SyncGroup API. In parallel, the members heartbeat with the Heartbeat API in order to maintain their session. The Heartbeat API is also used by the group coordinator to inform the members about an ongoing rebalance. All those interactions are synchronized on the generation of the group. It is important to note that the consumer does not make any assumption about the generation id. It basically uses what it receives from the group coordinator. At least, this is how the Java client works. The current rebalance protocol supports two modes: Eager and Cooperative. In the eager mode, the consumer revokes all its partitions before rejoining the group during a rebalance. In the cooperative mode, the consumer does not revoke any partitions before rejoining the group. However, it revokes the partitions that it does not own anymore when it receives it new assignment and rejoins immediately if he had to revoke any partitions.

The new protocol relies on the ConsumerGroupHeartbeat API to do all the above. Concretely, the API updates the group state, provides the partitions owned by the consumer, gets back the assignment, and updates the session. We can remap those to the old protocol as follow: the JoinGroup API updates the group state and provides the partitions owned, the SyncGroup API gets back the assignment, and the Heartbeat API updates the session. The main difference here is that the JoinGroup and SyncGroup does not run continuously. The group coordinator has to trigger it when it is need by returning the REBALANCE_IN_PROGRESS error in the heartbeat response.

The APIs will work as follow during the migration of a group.idea is to manage each members individually while relying on the new engine to do the synchronization between them. Each member will use rebalance loops to update the group coordinator and collect their assignment. The group coordinator will ensure that a rebalance is triggered when one needs to update his assignments. This process is detailed in the next sub-chapters:

Member States

Let's start by defining a state machine for each member:

...

We use the terminology already use in the current protocol intentionally here in order to facilitate the reasoning.

JoinGroup Handling

The JoinGroup API is handled as follow:

  • The basic validations are applied. The request is rejected with the appropriate error if the validation fails.
    • The group must exist.
    • The member must exist or be created.
    • The embedded protocol must be on version >= 3.
    • The generation id must match the current member epoch. The generation id is provided in the embedded protocol.
  • The group state is updated if needed.
    • The group epoch is incremented if a new assignment is required.
    • The Topics provided in the embedded consumer protocol are used to update the SubscribedTopicNames.
    • The Protocols are converted to Assignors where the UserData become the metadata and the version is set to -1.
  • The current member assignment is updated if needed:
    • If the member has revoked all its partitions or the required partitions, the member can transition to its next epoch. The current assignment become the current assignment. The group coordinator replies with the new member epoch as the generation id.
    • If the member has to revoke partitions, the group coordinator replies with the current member epoch as the generation id.
  • The member transitions to CompletingRebalance state.

SyncGroup API

The SyncGroup API is handled as follow:

  • The basic validations are applied.
    • The group must exist.
    • The member must exist.
    • The generation id must match the current member epoch. The generation id is provided in the embedded protocol.
  • The group coordinator replies with the current assignment. There are two cases to consider here:
    • If the member epoch is smaller than the target epoch, the coordinator replies with the intersection between the current assignment and target assignment. This is used to revoke the partitions not owned anymore. In this case, we know that the member will rejoins the group if it revokes partitions. That will automatically trigger another rebalance to collect the newly assigned partitions.
    • If the member epoch is equals to the target epoch, the coordinator replies with the current assignment - the partitions not revoked by other members yet.
  • The group coordinator serializes the assignment with the embedded consumer protocol.
  • The member transitions to Stable state.

Heartbeat API

The Heartbeat API is handled as follow:

  • The basic validations are applied.
    • The group must exist.
    • The member must exist.
    • The generation id must match the current member epoch. The generation id is provided in the embedded protocol.
  • The member session is updated.
  • The group coordinator replies with REBALANCE_IN_PROGRESS if the member is in the PrepareRebalance; NONE is used otherwise.

Rebalance Triggers

The group coordinator will trigger a rebalance in the following cases:

...