Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will define one command line API to help us better manage consumer groups:


Code Block
titleAdminClient.java
public static MembershipChangeResult invokeConsumerRebalance(String groupId);
public static MembershipChangeResult invokeConsumerRebalance(String groupId, InvokeConsumerRebalanceOptions options);


In the meantime we shall introduce a type of request/response to make the API effective:

Code Block

...

ConsumerRebalanceRequest

...

 

...

Code Block
ConsumerRebalanceRequest => GroupId
  GroupId             => String
ConsumerRebalanceResponse => GroupId, ErrorCode, ErrorMessage
  GroupId             => String
  ErrorCode 		  => Int16
  ErrorMessage	      => String

...

./bin/kafka-invoke-consumer-rebalance.sh --zookeeper localhost:2181 2181 --broker 1 --group-id  group-1 will immediately trigger a consumer group rebalance by transiting group state to PREPARE_REBALANCE. (explanation in next section.)

...

  • If the `member.id` uses UNKNOWN_MEMBER_NAME, we shall return the member id stored in the current map if `member.name` is known. Also once we are done with KIP-394, all the join group requests are requiring `member.id` to physically enter the consumer group. 
  • we are requiring member.id (if not unknown) to match the value stored in cache, otherwise reply MEMBER_ID_MISMATCH. The edge case that if we could have members with the same `member.name` (for example mis-configured instances with a valid member.id but added a used member name on runtime). When member name has duplicates, we could refuse join request from members with an outdated `member.id` (since we update the mapping upon each join group request). In an edge case where the client hits this exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce bug cases easily.

...

So in summary, the member will only be removed due to session timeout. We shall remove it from both in-memory static member name mapping and member list.

Scale down

Currently the scale down is controlled by session timeout, which means if user removes the over-provisioned consumer members it waits until session timeout to trigger the rebalance. This is not ideal and we plan to address this issue in next KIP by specifying the removal of specific members.

Fault-tolerance of static membership 

...