Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
titleJoinGroupResponse.java
public static Schema[] schemaVersions() {
    return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2, JOIN_GROUP_RESPONSE_V3, JOIN_GROUP_RESPONSE_V4};
}

On server side, broker will keep handling join group request <= v3 as before. If the protocol version is upgraded to v4 and the member name is set, the broker will use the member name specified in the join group request and respond with a unique "member id".  Broker will maintain an in-memory mapping of {member.name → member.id} to track member uniqueness.

For commit requests under static membership, we are requiring:

  • Both member.name and member.id must be set. Otherwise reply NO_STATIC_MEMBER_INFO_SET 
  • Member.name and member.id mapping are aligned with coordinator cache. Otherwise reply DUPLICATE_STATIC_MEMBER

so that when member name has duplicates, we could refuse commit request from members with an outdated member.id (since we update the mapping upon each join group request). Normally when hitting NO_STATIC_MEMBER_INFO_SET, it could potentially due to the consumer is doing rolling restart, where some members haven't updated their code with new member name. In an edge case where the client hits DUPLICATE_STATIC_MEMBER exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce corner cases easily.

Also notice that we have a conflicting internal config called LEAVE_GROUP_ON_CLOSE_CONFIG which decides whether a consumer should send a leave group request upon going offline. This would make the effectiveness of this KIP less because after leaving the consumer group, the broker will identify the same member as a new member which would still trigger a lot of rebalances. We will set this internal config default to false if enabling with member name.

...

titleConsumerConfig.java

...

Also notice that we have a conflicting internal config called LEAVE_GROUP_ON_CLOSE_CONFIG which decides whether a consumer should send a leave group request upon going offline. This would make the effectiveness of this KIP less because after leaving the consumer group, the broker will identify the same member as a new member which would still trigger a lot of rebalances. We will set this internal config default to false if enabling with member name.


Code Block
titleConsumerConfig.java
.defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
                Type.BOOLEAN,
                false, // if on static membership
                Importance.LOW)


Server behavior changes

On server side, broker will keep handling join group request <= v3 as before. If the protocol version is upgraded to v4 and the member name is set, the broker will use the member name specified in the join group request and respond with a unique "member id".  Broker will maintain an in-memory mapping of {member.name → member.id} to track member uniqueness.

For commit requests under static membership, we are requiring:

  • Both member.name and member.id must be set. Otherwise reply NO_STATIC_MEMBER_INFO_SET 
  • Member.name and member.id mapping are aligned with coordinator cache. Otherwise reply DUPLICATE_STATIC_MEMBER

so that when member name has duplicates, we could refuse commit request from members with an outdated member.id (since we update the mapping upon each join group request). Normally when hitting NO_STATIC_MEMBER_INFO_SET, it could potentially due to the consumer is doing rolling restart, where some members haven't updated their code with new member name. In an edge case where the client hits DUPLICATE_STATIC_MEMBER exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce corner cases easily.

When do we rebalance in static membership?

...

Registration timeout is the timeout we will trigger rebalance when a member goes offline for too long. It should usually be set much larger than session timeout which is used to detect consumer health. It is monitored through heartbeat the same way as session timeout, and will replace the session timeout in a static membership. The reason we define a different config is because we would like easy switch between dynamic membership and static membership (See details here)without adding on mental management burden. By setting it to 15 ~ 30 minutes, we are loosening the track of static member progress, and transfer the member management to client application like K8. Of course, we should not wait forever for the member to back online simply for the purpose of reducing rebalances. Eventually the member will be kicked out of group and a final rebalance is triggered. Note that we are tracking the earliest offline member and compare with the registration timeout. Example below with registration timeout 15 min:

...

where enableStaticMembership will change the consumer group to static membership, along with changing registration timeout and expansion timeout. After that, all the joining members are required to set the member name field. Error will be returned if the broker is on an old version or other potential failure cases. Note that the client should already include member name field at this point. User could also use this API to change the timeout configs as they want, or leave it blank for the first time when they start to try the new membership protocol.

enableDynamicMembership will in the contrary just change the membership back to dynamic mode. Error will be returned if the broker is on an old version, group is already on dynamic membership or so onother potential failure cases.

forceRebalance will trigger one rebalance immediately on static membership. Error will be returned if the broker is on an old version, or group is on dynamic membership and so onor other potential failure cases. This is mainly used for fast scale up/down cases.

Compatibility, Deprecation, and Migration Plan

...