Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The new `group.instance.id` config will be added to the join group request, and a list of `group.instance.id` and `member.id` will be added to the LeaveGroupRequest.

Code Block
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId GroupInstanceId ProtocolType GroupProtocols
  GroupId             => String
  SessionTimeout      => int32
  RebalanceTimeout	  => int32
  MemberId            => String
  GroupInstanceId     => String // new
  ProtocolType        => String
  GroupProtocols      => [Protocol MemberMetadata]
  Protocol            => String
  MemberMetadata      => bytes

LeaveGroupRequest => GroupId MemberId GroupInstanceIdList MemberIdList
  GroupId             => String
  MemberId            => String
  GroupInstanceIdList  GroupInstanceMap=> List[String] // new
  MemberIdList        => MapList[String, String] // new

In the meantime, we bump the join/leave group request/response version to v4/v3.

...

Code Block
languagejava
titleErrors.java
MEMBER_ID_MISMATCH(78, "TheFor join group contains request, this implies some group.instance.id which is already in the consumer group, however the member.id was not matching the record on coordinator", MemeberIdMisMatchException::new),; For leave group request, this implies the member.id list length doesn't align with group.instance.id list ", MemeberIdMisMatchException::new),
GROUP_INSTANCE_ID_NOT_FOUND(79, "Some group.instance.id specified in the leave group request are not found", GroupInstanceIdInvalidException::new)

...

Currently the scale down is controlled by session timeout, which means if user removes the over-provisioned consumer members it waits until session timeout to trigger the rebalance. This is not ideal and motivates us to change LeaveGroupRequest to be able to include a map list of [ `group.instance.id` , and a list of `member.id` ] such that we could batch remove offline members and trigger rebalance immediately without them.

...

RemoveMemberFromGroup (introduced above) will remove given instances and trigger rebalance immediately, which is mainly used for fast scale down/host replacement cases (we detect consumer failure faster than the session timeout). This API will first send a FindCoordinatorRequest to locate the correct broker, and initiate a LeaveGroupRequest to target broker hosting that coordinator. Within the request we add a two new field GroupInstanceMap lists which maps from `group.instance.id` to `member.id`. The reason to include `member.id` mapping list instead of only adding a `group.instance.id` list is to move LeaveGroupRequest towards a more consistent batch API in long term. The two lists are expected to be of same length and aligned, which means each `group.instance.id` at the same index of `member.id` has a strong matching in the current static membership map. The processing rules are following:

  1. For each member, `group.instance.id` must be provided. A dynamic member will supply an empty string which is interpreted as UNKNOWN_GROUP_INSTANCE_ID.
  2. Client could optionally provide a `member.id`. If it is provided, the member will only be removed if the `member.id` matches. Otherwise, only the `group.instance.id` is used. `Member.id` serves as a validation here, which currently will not be used but potentially useful in long term when the whole process is automated.

...

  1. The broker is on an old version (UNSUPPORTED_VERSION)
  2. Consumer group does not exist (INVALID_GROUP_ID)
  3. Operator is not authorized. (GROUP_AUTHORIZATION_FAILED)
  4. Some instance ids (non-empty) are not found, which means the request is not valid (GROUP_INSTANCE_ID_INVALID, defined in the public changes section)
  5. The length of MemberIdList doesn't match length of GroupInstanceIdList (MEMBER_ID_MISMATCH, defined in the public changes section)

...