Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

which will use the latest LeaveGroupRequest API to inform broker the permanent leaving of a bunch of consumer instances. 

A script called kafka-remove-member-from-group.sh will be added for end user to easily manipulate the consumer group.

./bin/kafka-remove-member-from-group.sh --zookeeper localhost:2181 --broker 1 --group-id  group-1  --group-instance-ids id_1,id_2 (comma separated id list) 

will immediately trigger a consumer group rebalance by transiting group state to PREPARE_REBALANCE, while removing all the static members in the given list. (explanation in later section.)

Proposed Changes

In short, the proposed feature is enabled if 

  1. Latest JoinGroupReq/Res and LeaveGroupReq/Res are supported on both client and broker.
  2. `group.instance.id` is configured with non-empty string.

Client behavior changes

On client side, we add a new config called `group.instance.id` in ConsumerConfig. On consumer service init, if the `group.instance.id` config is set, we will put it in the initial join group request to identify itself as a static member. Note that it is user's responsibility to assign unique `group.instance.id` for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate `group.instance.id` in case client configuration contains duplicates.

For the effectiveness of the KIP, consumer with `group.instance.id` set will not send leave group request when they go offline, which means we shall only rely on session.timeout to trigger group rebalance. It is because the proposed rebalance protocol will trigger rebalance with this intermittent in-and-out which is not ideal. In static membership we leverage the consumer group health management to client application such as k8s. Therefore, it is also advised to make the session timeout large enough so that broker side will not trigger rebalance too frequently due to member come and go. By having a handful admin tool, user could proactively remove members if session timeout is too long in runtime.

Kafka Streams change

KStream uses stream thread as consumer unit. For a stream instance configured with `num.threads` = 16, there would be 16 main consumers running on a single instance. If user specifies the client id, the stream consumer client id will be like: User client id + "-StreamThread-" + thread id + "-consumer"If user client id is not set, then we will use process id. Our plan is to reuse the consumer client id to define `group.instance.id`, so effectively the KStream instance will be able to use static membership if end user defines unique `client.id` for stream instances.

For easy operation, we define a new field in StreamsMetadata to expose all the `group.instance.id` given on each stream instance, so that user could

  1. Use REST API to get list of `group.instance.id` on stream instances user wants to remove
  2. Shutdown targeting stream instances
  3. Use command line API to batch remove offline consumers

Server behavior changes

On server side, broker will keep handling join group request <= v3 as before. The `member.id` generation and assignment is still coordinated by broker, and broker will maintain an in-memory mapping of {group.instance.id → member.id} to track member uniqueness. When receiving a known member's (A.K.A `group.instance.id` known) rejoin request, broker will return the cached assignment back to the member, without doing any rebalance.

For join group requests under static membership (with `group.instance.id` set),

Proposed Changes

In short, the proposed feature is enabled if 

  1. Latest JoinGroupReq/Res and LeaveGroupReq/Res are supported on both client and broker.
  2. `group.instance.id` is configured with non-empty string.

Client behavior changes

On client side, we add a new config called `group.instance.id` in ConsumerConfig. On consumer service init, if the `group.instance.id` config is set, we will put it in the initial join group request to identify itself as a static member. Note that it is user's responsibility to assign unique `group.instance.id` for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate `group.instance.id` in case client configuration contains duplicates.

For the effectiveness of the KIP, consumer with `group.instance.id` set will not send leave group request when they go offline, which means we shall only rely on session.timeout to trigger group rebalance. It is because the proposed rebalance protocol will trigger rebalance with this intermittent in-and-out which is not ideal. In static membership we leverage the consumer group health management to client application such as k8s. Therefore, it is also advised to make the session timeout large enough so that broker side will not trigger rebalance too frequently due to member come and go. By having a handful admin tool, user could proactively remove members if session timeout is too long in runtime.

Kafka Streams change

KStream uses stream thread as consumer unit. For a stream instance configured with `num.threads` = 16, there would be 16 main consumers running on a single instance. If user specifies the client id, the stream consumer client id will be like: User client id + "-StreamThread-" + thread id + "-consumer"If user client id is not set, then we will use process id. Our plan is to reuse the consumer client id to define `group.instance.id`, so effectively the KStream instance will be able to use static membership if end user defines unique `client.id` for stream instances.

For easy operation, we define a new field in StreamsMetadata to expose all the `group.instance.id` given on each stream instance, so that user could

  1. Use REST API to get list of `group.instance.id` on stream instances user wants to remove
  2. Shutdown targeting stream instances
  3. Use command line API to batch remove offline consumers

Server behavior changes

On server side, broker will keep handling join group request <= v3 as before. The `member.id` generation and assignment is still coordinated by broker, and broker will maintain an in-memory mapping of {group.instance.id → member.id} to track member uniqueness. When receiving a known member's (A.K.A `group.instance.id` known) rejoin request, broker will return the cached assignment back to the member, without doing any rebalance.

For join group requests under static membership (with `group.instance.id` set),

  • If the `member.id` uses UNKNOWN_MEMBER_ID, we shall always generate a new member id and replace the one within current map. We also expect that after KIP-394, all the join group requests are requiring `member.id` to physically enter the consumer group, so the behavior of static member is consistent with that proposal.
  • we are requiring member.id (if not unknown) to match the value stored in cache, otherwise reply MEMBER_ID_MISMATCH. The edge case is that if we could have members with the same `group.instance.id` (for example mis-configured instances with a valid `member.id` but added a used `group.instance.id` on runtime). When `group.instance.id`
  • If the `member.id` uses UNKNOWN_MEMBER_ID, we shall always generate a new member id and replace the one within current map. We also expect that after KIP-394, all the join group requests are requiring `member.id` to physically enter the consumer group, so the behavior of static member is consistent with that proposal.
  • we are requiring member.id (if not unknown) to match the value stored in cache, otherwise reply MEMBER_ID_MISMATCH. The edge case is that if we could have members with the same `group.instance.id` (for example mis-configured instances with a valid `member.id` but added a used `group.instance.id` on runtime). When `group.instance.id` has duplicates, we could refuse join request from members with an outdated `member.id`, since we update the mapping upon each join group request. In an edge case where the client hits this exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce bug cases easily.

...

  1. Pre-registration (Proposed by Jason). Client user could provide a list of hard-coded `group.instance.id` so that the server could respond to scaling operations more intelligently. For example when we scale up the fleet by defining 4 new client instance ids, the server shall wait until all 4 new members to join the group before kicking out the rebalance, same with scale down.
  2. Add hot standby hosts by defining `target.group.size` (proposed by Mayuresh). We shall keep some idle consumers within the group and when one of the active member go offline, we shall trigger hot swap due to the fact that current group size is smaller than `target.group.size`. With this change we might even not need to extend the session timeout since we should easily use the spare consumer to start workingoffline, we shall trigger hot swap due to the fact that current group size is smaller than `target.group.size`. With this change we might even not need to extend the session timeout since we should easily use the spare consumer to start working. 
  3. Add a script called kafka-remove-member-from-group.sh for end user to easily manipulate the consumer group. (proposed by Boyang) ./bin/kafka-remove-member-from-group.sh --zookeeper localhost:2181 --broker 1 --group-id  group-1  --group-instance-ids id_1,id_2 (comma separated id list)  will immediately trigger a consumer group rebalance by transiting group state to PREPARE_REBALANCE, while removing all the static members in the given list