Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId GroupInstanceId ProtocolType GroupProtocols
  GroupId             => String
  SessionTimeout      => int32
  RebalanceTimeout	  => int32
  MemberId            => String
  GroupInstanceId     => String // new
  ProtocolType        => String
  GroupProtocols      => [Protocol MemberMetadata]
  Protocol            => String
  MemberMetadata      => bytes

LeaveGroupRequest => GroupId MemberId GroupInstanceIdList
  GroupId             => String
  MemberId            => String
  GroupInstanceIdListGroupInstanceMap    => listMap[String, String] // new

In the meantime, we bump the join/leave group request/response version to v4/v3.

...

Currently the scale down is controlled by session timeout, which means if user removes the over-provisioned consumer members it waits until session timeout to trigger the rebalance. This is not ideal and motivates us to change LeaveGroupRequest to be able to include a list map of [`group.instance.id`, `member.id`] such that we could batch remove offline members and trigger rebalance immediately without them.

...

RemoveMemberFromGroup (introduced above) will remove given instances and trigger rebalance immediately, which is mainly used for fast scale down/host replacement cases (we detect consumer failure faster than the session timeout). This API will first send a FindCoordinatorRequest to locate the correct broker, and initiate a LeaveGroupRequest to target broker hosting that coordinator, and the coordinator will decide whether to take this metadata change request based on its status on runtime.. Within the request we add a new field GroupInstanceMap which maps from `group.instance.id` to `member.id`. The reason to include `member.id` mapping instead of only adding a `group.instance.id` list is to move LeaveGroupRequest towards a more consistent batch API in long term. The rules are following:

  1. For each member, `group.instance.id` must be provided. A dynamic member will supply an empty string which is interpreted as UNKNOWN_GROUP_INSTANCE_ID.
  2. Client could optionally provide a `member.id`. If it is provided, the member will only be removed if the `member.id` matches. Otherwise, only the `group.instance.id` is used. `Member.id` serves as a validation here, which currently will not be used but potentially useful in long term when the whole process is automated.

The coordinator will decide whether to take this metadata change request based on its status on runtime. Error will be returned if

  1. The broker is on an old version (UNSUPPORTED_VERSION)
  2. Consumer group does not exist (INVALID_GROUP_ID)
  3. Operator is not authorized. (GROUP_AUTHORIZATION_FAILED)
  4. Some instance ids are not found, which means the request is not valid (GROUP_INSTANCE_ID_INVALID, defined in the public changes section)

...