Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the current architecture, during each rebalance consumer groups on broker side will assign new member id with a UUID randomly generated each time. This is to make sure we have unique identity for each group member. During client restart, consumer will send a JoinGroupRequest with a special UNKNOWN_MEMBER id, which has no intention to be treated as an existing member.  To make the KIP work, we need to change both client side and server side logic to make sure we persist member identity by checking `group.memberinstance.name` id` (explained later) throughout restarts, which means we could reduce number of rebalances since we are able to apply the same assignment based on member identities. The idea is summarized as static membership, which in contrary to dynamic membership (the one our system currently uses), is prioritizing "state persistence" over "liveness". 

...

  • Static Membership: the membership protocol where the consumer group will not trigger rebalance unless 1. a new member joins 2. a leader rejoins. 3. an existing member go offline over session timeout.
  • Group member nameinstance id: the unique identifier defined by user to distinguish each client instance.

...

New Configurations

Consumer configs

group.memberinstance.nameid

The unique identifier of the consumer instance provided by end user.

Default value: empty string.

Client side changes

The new `group.memberinstance.name` config id` config will be added to the join group request.

...

Code Block
languagejava
titleErrors.java
MEMBER_ID_MISMATCH(78, "The join group contains group memberinstance nameid which is already in the consumer group, however the member id was not matching the record on coordinator", MemeberIdMisMatchException::new),

...

For fault-tolerance, we also include group member name instance id within the member metadata to backup in the offset topic.

...

  1. JoinGroupRequest V4 is supported on both client and broker
  2. `group.memberinstance.name` id` is configured with non-empty string.

...

On client side, we add a new config called `group.memberinstance.name` id` in ConsumerConfig. On consumer service init, if the `group.memberinstance.name` config id` config is set, we will put it in the initial join group request to identify itself as a static member (static membership). Note that it is user's responsibility to assign unique `group.memberinstance.name` id` for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate `group.memberinstance.name` in id` in case client configured it wrong.

For the effectiveness of the KIP, consumer with `group.memberinstance.name` set id` set will not send leave group request when they go offline, which means we shall only rely on session.timeout to trigger group rebalance. It is because the proposed rebalance protocol will trigger rebalance with this intermittent in-and-out which is not ideal. In static membership we leverage the consumer group health management to client application such as K8. Therefore, it is also advised to make the session timeout large enough so that broker side will not trigger rebalance too frequently due to member come and go.

...

On server side, broker will keep handling join group request <= v3 as before. The `member.id` generation and assignment is still coordinated by broker, and broker will maintain an in-memory mapping of {group.memberinstance.name id → member.id} to track member uniqueness. When receiving an known member's (A.K.A `group.memberinstance.name` id` known) rejoin request, broker will return the cached assignment back to the member, without doing any rebalance.

For join group requests under static membership (with `group.memberinstance.name` id` set),

  • If the `member.id` uses UNKNOWN_MEMBER_NAME, we shall always generate a new member id and replace the one within current map. We also expect that after KIP-394, all the join group requests are requiring `member.id` to physically enter the consumer group, so the behavior of static member is consistent with that proposal.
  • we are requiring member.id (if not unknown) to match the value stored in cache, otherwise reply MEMBER_ID_MISMATCH. The edge case is that if we could have members with the same `group.memberinstance.name` id` (for example mis-configured instances with a valid member.id but added a used `group.memberinstance.name` id` on runtime). When `group.memberinstance.name` id` has duplicates, we could refuse join request from members with an outdated `member.id` (since we update the mapping upon each join group request). In an edge case where the client hits this exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce bug cases easily.

For join group requests under dynamic membership (without `group.memberinstance.name` id` set), the handling logic will remain unchanged. If the broker version is not the latest (< v4), the join group request shall be downgraded to v3 without setting the member Id.

...

So in summary, the member will only be removed due to session timeout. We shall remove it from both in-memory static group member name instance id mapping and member list.

Scale down

...

To make sure we could recover from broker failure/leader transition, an in-memory `group.memberinstance.name` id` map is not enough. We would reuse the `_consumer_offsets` topic to store the static member map information. When another broker takes over the leadership, we could transfer the mapping together. 

...

  1. Upgrade your broker to include this KIP.
  2. Upgrade your client to include this KIP.
  3. Set `group.memberinstance.name` id` and session timeout to a reasonable number, and rolling bounce your consumer group.

...

The downgrade process is also straightforward. End user could just unset `group.memberinstance.name` id` and do a rolling bounce to switch back to dynamic membership. The static membership metadata stored on broker will not take any effect when `group.memberinstance.name` id` is empty. After consumer offset topic retention, the old mapping messages will be gone completely.

...

In this pull request, we did an experimental approach to materialize member id(the identity given by broker, equivalent to the `group.memberinstance.name` id` in proposal) on the instance local disk. This approach could reduce the rebalances as expected, which is the experimental foundation of KIP-345. However, KIP-345 has a few advantages over it:

  1. It gives users more control of their `group.memberinstance.name`id`; this would help for debugging purposes.
  2. It is more cloud-/k8s-and-alike-friendly: when we move an instance from one container to another, we can copy the `group.memberinstance.name` id` to the config files.
  3. It doe not require the consumer to be able to access another dir on the local disks (think your consumers are deployed on AWS with remote disks mounted).
  4. By allowing consumers to optionally specifying a `group.memberinstance.name`id`, this rebalance benefit can be easily migrated to connect and streams as well which relies on consumers, even in a cloud environment.

...

  1. Pre-registration (Proposed by Jason). Client user could provide a list of hard-coded `group.memberinstance.name` id` so that the server could respond to scaling operations more intelligently. For example when we scale up the fleet by defining 4 new client member namesinstance ids, the server shall wait until all 4 new members to join the group before kicking out the rebalance, same with scale down.
  2. Add hot standby hosts by defining `target.group.size` (proposed by Mayuresh). We shall keep some idle consumers within the group and when one of the active member go offline, we shall trigger hot swap due to the fact that current group size is smaller than `target.group.size`. With this change we might even not need to extend the session timeout since we should easily use the spare consumer to start working. 

...