...
The new `group.instance.id` config will be added to the join group request, and a list of `group.instance.id` and `member.id` will be added to the LeaveGroupRequest.
Code Block |
---|
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId GroupInstanceId ProtocolType GroupProtocols GroupId => String SessionTimeout => int32 RebalanceTimeout => int32 MemberId => String GroupInstanceId => String // new ProtocolType => String GroupProtocols => [Protocol MemberMetadata] Protocol => String MemberMetadata => bytes LeaveGroupRequest => GroupId MemberId GroupInstanceIdList MemberIdList GroupId => String MemberId => String GroupInstanceIdList GroupInstanceMap=> List[String] // new MemberIdList => MapList[String, String] // new |
In the meantime, we bump the join/leave group request/response version to v4/v3.
...
Code Block | ||||
---|---|---|---|---|
| ||||
MEMBER_ID_MISMATCH(78, "TheFor join group contains request, this implies some group.instance.id which is already in the consumer group, however the member.id was not matching the record on coordinator", MemeberIdMisMatchException::new),; For leave group request, this implies the member.id list length doesn't align with group.instance.id list ", MemeberIdMisMatchException::new), GROUP_INSTANCE_ID_NOT_FOUND(79, "Some group.instance.id specified in the leave group request are not found", GroupInstanceIdInvalidException::new) |
...
Currently the scale down is controlled by session timeout, which means if user removes the over-provisioned consumer members it waits until session timeout to trigger the rebalance. This is not ideal and motivates us to change LeaveGroupRequest to be able to include a map list of [ `group.instance.id` , and a list of `member.id` ] such that we could batch remove offline members and trigger rebalance immediately without them.
...
RemoveMemberFromGroup (introduced above) will remove given instances and trigger rebalance immediately, which is mainly used for fast scale down/host replacement cases (we detect consumer failure faster than the session timeout). This API will first send a FindCoordinatorRequest to locate the correct broker, and initiate a LeaveGroupRequest to target broker hosting that coordinator. Within the request we add a two new field GroupInstanceMap lists which maps from `group.instance.id` to `member.id`. The reason to include `member.id` mapping list instead of only adding a `group.instance.id` list is to move LeaveGroupRequest towards a more consistent batch API in long term. The two lists are expected to be of same length and aligned, which means each `group.instance.id` at the same index of `member.id` has a strong matching in the current static membership map. The processing rules are following:
- For each member, `group.instance.id` must be provided. A dynamic member will supply an empty string which is interpreted as UNKNOWN_GROUP_INSTANCE_ID.
- Client could optionally provide a `member.id`. If it is provided, the member will only be removed if the `member.id` matches. Otherwise, only the `group.instance.id` is used. `Member.id` serves as a validation here, which currently will not be used but potentially useful in long term when the whole process is automated.
...
- The broker is on an old version (UNSUPPORTED_VERSION)
- Consumer group does not exist (INVALID_GROUP_ID)
- Operator is not authorized. (GROUP_AUTHORIZATION_FAILED)
- Some instance ids (non-empty) are not found, which means the request is not valid (GROUP_INSTANCE_ID_INVALID, defined in the public changes section)
- The length of MemberIdList doesn't match length of GroupInstanceIdList (MEMBER_ID_MISMATCH, defined in the public changes section)
...