Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId GroupInstanceId ProtocolType GroupProtocols
  GroupId             => String
  SessionTimeout      => int32
  RebalanceTimeout	  => int32
  MemberId            => String
  GroupInstanceId     => String // new
  ProtocolType        => String
  GroupProtocols      => [Protocol MemberMetadata]
  Protocol            => String
  MemberMetadata      => bytes


LeaveGroupRequest => GroupId MemberId GroupInstanceIdList
  GroupId             => String
  MemberId            => String
  GroupInstanceIdList => list[String] // new

...

Code Block
languagejava
titleErrors.java
MEMBER_ID_MISMATCH(78, "The join group contains group instance id which is already in the consumer group, however the member id was not matching the record on coordinator", MemeberIdMisMatchException::new),

Stream side change

On Kafka Streams side, we plan to expose the list of `group.instance.id` for easy management combined with command line scripts:

Code Block
languagejava
titleStreamsMetadata.java
public Set<String> mainConsumerGroupInstanceIds();

Server side changes

We shall increase the cap of session timeout to 30 min for relaxing static membership liveness tracking.

...

which will use the latest LeaveGroupRequest API to inform broker the permanent leaving of a bunch of instances through passing the id list. 

A script called kafka-remove-invokemember-consumerfrom-rebalancegroup.sh will be added for end user to easily manipulate the consumer group.

./bin/kafka-invoke-consumer-rebalanceremove-member-from-group.sh --zookeeper localhost:2181 --broker 1 --group-id  group-1  --group-instance-ids id_1,id_2 (comma separated id list) 

will immediately trigger a consumer group rebalance by transiting group state to PREPARE_REBALANCE, while removing all the static members in given list. (explanation in next section.)

...

KStream uses stream thread as consumer unit. For a stream instance configured with `num.threads` = 16, there would be 16 main consumers running on a single instance. We could actually borrow the idea for initializing client id for stream thread consumer to apply for `group.instance.id` generation.For example if If user specifies the client id, the stream consumer client id will be like: User client id + "-StreamThread-" + thread id + "-consumer"If user client id is not set, then we will use process id. Our plan is to reuse the above consumer client id to define `group.instance.id`, so effectively the KStream instance will be able to use static membership if end user defines unique `client.id` for stream instances.

For easy operation, we defined a new field in StreamsMetadata to expose all the `group.instance.id` given on each stream instance, so that user could use the command line API to batch remove offline consumers without waiting for session timeout.

Server behavior changes

On server side, broker will keep handling join group request <= v3 as before. The `member.id` generation and assignment is still coordinated by broker, and broker will maintain an in-memory mapping of {group.instance.id → member.id} to track member uniqueness. When receiving an known member's (A.K.A `group.instance.id` known) rejoin request, broker will return the cached assignment back to the member, without doing any rebalance.

...