Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
titleAdminClient.java
public static MembershipChangeResult removeMemberFromGroup(String groupId, list<String> groupInstanceIds, RemoveMemberFromGroupOptions options);

In the meantime we shall introduce a type of request/response to make the API effective:

...

which will use the latest LeaveGroupRequest API to inform broker the permanent leaving of a bunch of instances through passing the id list. 

A script called kafka-invoke-consumer-rebalance.sh will be added for end user to easily manipulate the consumer group.

...

Currently the scale down is controlled by session timeout, which means if user removes the over-provisioned consumer members it waits until session timeout to trigger the rebalance. This is not ideal and we plan to address this issue in next KIP by specifying the removal of specific membersmotivates us to change LeaveGroupRequest to be able to include a list of `group.instance.id`s such that we could batch remove offline members and trigger rebalance immediately without them.

Fault-tolerance of static membership 

To make sure we could recover from broker failure/leader transition, an in-memory `group.instance.id` map is not enough. We would reuse the `_consumer_offsets` topic to store the static member map information. When another broker takes over the leadership, we could transfer the mapping it will load the static mapping info together. 

Command line API for membership management

InvokeConsumerRebalance RemoveMemberFromGroup (introduced above) will trigger one rebalance immediately on static membership, which is mainly used for fast scale down/host replacement cases (we detect consumer failure faster than the session timeout). This API will first send a FindCoordinatorRequest to locate the target broker, and initiate a ConsumerRebalanceRequest LeaveGroupRequest to target broker hosting that coordinator, and the coordinator will decide whether to take this metadata change request based on its status at time.

...

  1. the broker is on an old version.
  2. Consumer group does not exist.
  3. Operator is not authorized. (Neither neither admin nor consumer group creater)
  4. if the group is not in a valid state to transit to rebalance. (use `canRebalance` function defined in GroupMetadata.scala to check)
  5. Some instance ids are not found. (which means the request is not valid)

We need to enforce special access to these APIs to the end user who may not be in administrative role of Kafka Cluster. We shall allow a similar access level to the join group request, so the consumer service owner could easily use this API.

...