Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the current architecture, during each rebalance consumer groups on broker side will assign new member id with a UUID randomly generated each time. This is to make sure we have unique identity for each group member. During client restart, consumer will send a JoinGroupRequest with a special UNKNOWN_MEMBER id, which has no intention to be treated as an existing member.  To make the KIP work, we need to change both client side and server side logic to make sure we persist member identity by checking `group.member.name` (explained later) throughout restarts, which means we could reduce number of rebalances since we are able to apply the same assignment based on member identities. The idea is summarized as static membership, which in contrary to dynamic membership (the one our system currently uses), is prioritizing "state persistence" over "liveness". Since for many stateful consumer/stream applications, the state shuffling is more painful than short time partial unavailability. 

We will be introducing two new terms:

...

On client side, we add a new config called MEMBER_NAME `group.member.name` in ConsumerConfig. On consumer service init, if the MEMBER_NAME config `group.member.name` config is set, we will put it in the initial join group request to identify itself as a static member (static membership); otherwise, we will still send UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID (dynamic membership). Note that it is user. Note that it is user's responsibility to assign unique `group.member id .name` for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate `group.member.name` in case client configured it wrong.

...

On server side, broker will keep handling join group request <= v3 as before. If the protocol version is upgraded to v4 and the `group.member.name` is set, the broker will use the member name specified in the join group request and respond with a unique "member id".  Broker The `member.id` generation and assignment is still coordinated by broker, and broker will maintain an in-memory mapping of {group.member.name → member.id} to track member uniqueness. When receiving an existing known member's (A.K.A `group.member.name` known) rejoin request, broker will return the cached assignment back to the member, without doing any rebalance.

For join group requests under static membership (with `group.member.name` set),

  • If the `member.id` uses UNKNOWN_MEMBER_NAME, we shall return the always generate a new member id stored in and replace the one within current map, if `group.member.name` is known. Also once we are done with KIP-394, all the join group requests are requiring `member.id` to physically enter the consumer group. 
  • we are requiring member.id (if not unknown) to match the value stored in cache, otherwise reply MEMBER_ID_MISMATCH. The edge case is that if we could have members with the same `group.member.name` (for example mis-configured instances with a valid member.id but added a used `group.member.name` on runtime). When `group.member.name` has duplicates, we could refuse join request from members with an outdated `member.id` (since we update the mapping upon each join group request). In an edge case where the client hits this exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce bug cases easily.

...

So in summary, the member will only be removed due to session timeout. We shall remove it from both in-memory static group member name mapping and member list.

...

  1. the broker is on an old version.
  2. Consumer group does not exist.
  3. Operator is not authorized. (Neither admin nor consumer group creater)
  4. if the group is not in a valid state to transit to rebalance. (use `canRebalance` function defined in GroupMetadata.scala to check)other potential failure cases.

We need to enforce special access to these APIs to the end user who may not be in administrative role of Kafka Cluster. We shall allow a similar access level to the join group request, so the consumer service owner could easily use this API. Another detail to take care is that we need to automatically take the hash of group id so that we know which broker to send this request to.

...

The fallback logic has been discussed previously. Broker with a lower version would just downgrade static membership towards dynamic membership.

Upgrade

...

process

The recommended upgrade process is as follow:

...

That's it! We believe that the static membership logic is compatible with the current dynamic membership, which means it is allowed to have static members and dynamic members co-exist within the same consumer group. This assumption could be further verified when we do some modeling of the protocol (through TLA maybe) or dev test. 

Downgrade process

The downgrade process is also straightforward. End user could just unset `group.member.name` and do a rolling bounce to switch back to dynamic membership. The static membership metadata stored on broker will not take any effect.

Non-goal

We do have some offline discussions on handling leader rejoin case, for example since the broker could also do the subscription monitoring work, we don't actually need to trigger rebalance on leader side blindly based on its rejoin request. However this is a separate topic and we will address it in another KIP. 

...