Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Use REST API to get list of `group.instance.id` on stream instances user wants to remove
  2. Shutdown targeting stream instances
  3. Use command line API to batch remove offline consumers

Server Behavior Changes

Join Group Logic Change

On server side, broker will keep handling join group request <= v3 as before. The `member.id` generation and assignment is still coordinated by broker, and broker will maintain an in-memory mapping of {group.instance.id → member.id} to track member uniqueness. When receiving a known member's (A.K.A `group.instance.id` known) rejoin request, broker will return the cached assignment back to the member, without doing any rebalance.

...

For join group requests under dynamic membership (without `group.instance.id` set), the handling logic will remain unchanged. If the broker version is not the latest (< v4), the join group request shall be downgraded to v3.Scale Up

Leave Group Logic Change

We will not plan to solve the scale up issue holistically within this KIP, since there is a parallel discussion about Incremental Cooperative Rebalancing, in which we will encode the "when to rebalance" logic at the application level, instead of at the protocol level. 

For initial scale up, there is a plan to deprecate group.initial.rebalance.delay.ms since we no longer needs it once static membership is delivered and the incremental rebalancing work is done.

Rolling Bounce

Currently broker accepts a config value called rebalance timeout which is provided by consumer max.poll.intervals. The reason we set it to poll interval is because consumer could only send request within the call of poll() and we want to wait sufficient time for the join group request. When reaching rebalance timeout, the group will move towards COMPLETING_REBALANCE stage and remove unjoined members. This is actually conflicting with the design of static membership, because those temporarily unavailable members will potentially reattempt the join group and trigger extra rebalances. Internally we would optimize this logic by having rebalance timeout only in charge of stopping PREPARE_REBALANCE stage, without removing non-responsive members immediately. There would not be a full rebalance if the lagging consumer sends a JoinGroupRequest within the session timeout.

So in summary, the member will only be removed due to session timeout. We shall remove it from both in-memory static `group.instance.id` map and member list.

Scale Down

Currently the scale down is controlled by session timeout, which means if user removes the over-provisioned consumer members it waits until session timeout to trigger the rebalance. This is not ideal and motivates us to change LeaveGroupRequest to be able to include a list of `group.instance.id` and a list of `member.id` such that we could batch remove offline members and trigger rebalance immediately without them.

Fault-tolerance of Static Membership 

To make sure we could recover from broker failure/coordinator transition, an in-memory `group.instance.id` map is not enough. We would reuse the _consumer_offsets topic to store the static member map information. When another broker takes over the leadership, it will load the static mapping info together. 

Command Line API for Membership Management

RemoveMemberFromGroup (introduced above) will remove given instances and trigger rebalance immediately, which is mainly used for fast scale down/host replacement cases (we detect consumer failure faster than the session timeout). This API will first send a FindCoordinatorRequest to locate the correct broker, and initiate a LeaveGroupRequest to target broker hosting that coordinator. Within the request we add two new lists which maps from `group.instance.id` to `member.id`. The reason to include `member.id` list instead of solely adding a `group.instance.id` list is to move LeaveGroupRequest towards a more consistent batch API in long term. The two lists are expected to be of same length and aligned, which means each `group.instance.id` at the same index of `member.id` has a strong matching reflected within current static membership on broker. The processing rules are following:

...

extended the LeaveGroupRequest API with two new lists which map from `group.instance.id` to `member.id`. The reason to include `member.id` list instead of solely adding a `group.instance.id` list is to move LeaveGroupRequest towards a more consistent batch API in long term. The two lists are expected to be of same length and aligned, which means each `group.instance.id` at the same index of `group.instance.id` has a strong matching reflected within current static membership on broker. The processing rules are following:

  1. For static member, `group.instance.id` must be provided. Client could optionally provide a `member.id` when `group.instance.id` is configured non-empty. If `member.id` is provided, the member will only be removed if the `member.id` matches. Otherwise, only the `group.instance.id` is used. The `member.id` serves as a validation here, which currently will not be used (set to empty string) but potentially useful if we do fully automated removal process.
  2. For leave group requests under dynamic membership, it will apply a singleton list of `member.id` that it is currently using, and a singleton list of `group.instance.id` which is set to empty string. If this is the case, we shall just remove the given dynamic member the same way as current leave group logic.

Of course there could be a theoretical case where both `member.id` and `group.instance.id` are set to empty string. We shall explicitly handle the case as an error and expose in the server log, while returning UNKNOWN_MEMBER_ID error if both lists are configured with empty strings.

Scale Up

We will not plan to solve the scale up issue holistically within this KIP, since there is a parallel discussion about Incremental Cooperative Rebalancing, in which we will encode the "when to rebalance" logic at the application level, instead of at the protocol level. 

For initial scale up, there is a plan to deprecate group.initial.rebalance.delay.ms since we no longer needs it once static membership is delivered and the incremental rebalancing work is done.

Rolling Bounce

Currently broker accepts a config value called rebalance timeout which is provided by consumer max.poll.intervals. The reason we set it to poll interval is because consumer could only send request within the call of poll() and we want to wait sufficient time for the join group request. When reaching rebalance timeout, the group will move towards COMPLETING_REBALANCE stage and remove unjoined members. This is actually conflicting with the design of static membership, because those temporarily unavailable members will potentially reattempt the join group and trigger extra rebalances. Internally we would optimize this logic by having rebalance timeout only in charge of stopping PREPARE_REBALANCE stage, without removing non-responsive members immediately. There would not be a full rebalance if the lagging consumer sends a JoinGroupRequest within the session timeout.

So in summary, the member will only be removed due to session timeout. We shall remove it from both in-memory static `group.instance.id` map and member list.

Scale Down

Currently the scale down is controlled by session timeout, which means if user removes the over-provisioned consumer members it waits until session timeout to trigger the rebalance. This is not ideal and motivates us to change LeaveGroupRequest to be able to include a list of `group.instance.id` and a list of `member.id` such that we could batch remove offline members and trigger rebalance immediately without them.

Fault-tolerance of Static Membership 

To make sure we could recover from broker failure/coordinator transition, an in-memory `group.instance.id` map is not enough. We would reuse the _consumer_offsets topic to store the static member map information. When another broker takes over the leadership, it will load the static mapping info together. 

Command Line API for Membership Management

RemoveMemberFromGroup (introduced above) will remove given instances and trigger rebalance immediately, which is mainly used for fast scale down/host replacement cases (we detect consumer failure faster than the session timeout). This API will first send a FindCoordinatorRequest to locate the correct broker, and initiate a LeaveGroupRequest to target broker hosting that coordinator. 

...

The coordinator will decide whether to take this metadata change request based on its status on runtime. Error will be returned if

...