Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the current architecture, during each rebalance consumer groups on broker side will assign new member id with a UUID randomly generated each time. This is to make sure we have unique identity for each group member. During client restart, consumer will send a JoinGroupRequest with a special UNKNOWN_MEMBER id, which has no intention to be treated as an existing member.  To make the KIP work, we need to change both client side and server side logic to make sure we persist member identity throughout restarts, which means we could reduce number of rebalances since we are able to apply the same assignment based on member identities. The idea is summarized as `static membership`, which in contrary to dynamic membership, is prioritizing "state persistence" over "liveness". Since for many stateful consumer/stream applications, the state shuffling is more painful than short time partial unavailability.

Proposed Changes

We will be introducing new terms:

  • Static Membership: the membership protocol where the consumer group will not trigger rebalance unless 1. a new unknown member joins 2. a leader rejoins. 3. an existing member go offline over certain timeout.
  • Member name: the unique identifier defined by user to distinguish each client instance.
  • Member registration timeout: the max time we could tolerate a static member to go offline.
  • Member expansion timeout: the max time we will wait since we receive a new static member join request. (Details later)

On client side, we add a new config called MEMBER_NAME in ConsumerConfig. On consumer service init, if the MEMBER_NAME config is set, we will put it in the initial join group request to identify itself as a static member .(static membership); otherwise, we will still send UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID (dynamic membership). To distinguish between previous version of protocol, we will also increase the join group request version to v4 when MEMBER_ID NAME is set.  Note that it is user's responsibility to assign unique member id for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate member.name in case client configured it wrong.

Code Block
languagejava
titleConsumerConfig.java
public static final STRING MEMBER_NAME = "member_A"; // default empty String

...

On server side, broker will keep handling join group request <= v3 as before. If the protocol version is upgraded to v4 and the member id name is set, the broker will use the member id name specified in the join group request and respond with a unique "member id" Otherwise broker will use the client id plus random generated member id suffix as the member id, same as now. The Broker side will maintain an in-memory mapping of {member.name → member.id} so that if member name has duplicates, we could refuse commit request from members with an outdated member.id (since we update the mapping upon each join group request). The change will be applied in addMemberAndRebalance. 

...

Code Block
titleJoinGroupResponse.java
public static Schema[] schemaVersions() {
    return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2, JOIN_GROUP_RESPONSE_V3, JOIN_GROUP_RESPONSE_V4};
}

...


Also notice that we have a conflicting internal config called LEAVE_GROUP_ON_CLOSE_CONFIG which decides whether a consumer should send a leave group request upon going offline. This would make the effectiveness of this KIP less because after leaving the consumer group, the broker will identify the same member as a new member which would still trigger a lot of rebalances. We will set this internal config default to false.


Code Block
titleConsumerConfig.java
.defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
                Type.BOOLEAN,
                false,
                Importance.LOW)

...