Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Right now broker handles consumer state in a two-phase protocol. To solely explain consumer rebalance, we only discuss 3 involving states here: RUNNING, PREPARE_REBALANCE and SYNCCOMPLETING_REBALANCE.

  • When a consumer joins the group, if this is a new member or the group leader, the broker will move this group state from RUNNING to PREPARE_REBALANCE. The reason for triggering rebalance when leader rejoins is because there might be assignment protocol change (for example if the consumer group is using regex subscription and new matching topics show up). If an old normal member rejoins the group, the state will not change. 
  • When moved to PREPARE_REBALANCE state, the broker will mark first joined consumer as leader, and wait for all the members to rejoin the group. Once we collected all current members' join group requests or reached rebalance timeout, we will reply the leader with current member information and move the state to COMPLETING_REBALANCE. All current members are informed to send SyncGroupRequest to get the final assignment.
  • The leader consumer will decide the assignment and send it back to broker. As last step, broker will announce the new assignment by sending SyncGroupResponse to all the followers. Till now we finished one rebalance and the group generation is incremented by 1.

In the current architecture, during each rebalance consumer groups on broker side will assign new member a randomly generated UUID id called `member.id` each time. This is to make sure we have unique identity for each group member. During client restart, consumer will send a JoinGroupRequest with a special UNKNOWN_MEMBER_ID (empty string), and broker will interpret it as a new member.   To make this KIP work, we need to change both client side and server side logic to make sure we persist member identity by persisting a new `group.instance.id` (explained later) throughout restarts, which means we could reduce number of rebalances since we are able to apply the same assignment based on member identities. The idea is summarized as static membership, which in contrary to dynamic membership (the one our system currently uses), is prioritizing "state persistence" over "liveness". 

...

  • Static Membership: the membership protocol where the consumer group will not trigger rebalance unless 
    • A new member joins
    • A leader rejoins
    • An existing member offline time is over session timeout
    • Broker receives a leave group request containing a list of `group.instance.id`s (details later)
  • Group instance id: the unique identifier defined by user to distinguish each client instance.

...

We are also introducing a new type of return error in JoinGroupResponse V4. Will explain the handling in the next following section.

Code Block
languagejava
titleErrors.java
MEMBER_ID_MISMATCH(78, "The join group contains group .instance .id which is already in the consumer group, however the member .id was not matching the record on coordinator", MemeberIdMisMatchException::new),

...

Code Block
languagescala
titleGroupMetadataManager
private val MEMBER_METADATA_V3 = new Schema(
  new Field(MEMBER_ID_KEY, STRING),
  new Field(MEMBERGROUP_INSTANCE_NAMEID_KEY, STRING), // new
  new Field(CLIENT_ID_KEY, STRING),
  new Field(CLIENT_HOST_KEY, STRING),
  new Field(REBALANCE_TIMEOUT_KEY, INT32),
  new Field(SESSION_TIMEOUT_KEY, INT32),
  new Field(SUBSCRIPTION_KEY, BYTES),
  new Field(ASSIGNMENT_KEY, BYTES))

...

Code Block
titleAdminClient.java
public static MembershipChangeResult removeMemberFromGroup(String groupId, list<String> groupInstanceIdsgroupInstanceIdsToRemove, RemoveMemberFromGroupOptions options);

which will use the latest LeaveGroupRequest API to inform broker the permanent leaving of a bunch of consumer instances through passing the id list

A script called kafka-remove-member-from-group.sh will be added for end user to easily manipulate the consumer group.

...

will immediately trigger a consumer group rebalance by transiting group state to PREPARE_REBALANCE, while removing all the static members in the given list. (explanation in next later section.)

Proposed Changes

...

  1. Latest JoinGroupReq/Res and LeaveGroupReq/Res are supported on both client and broker.
  2. `group.instance.id` is configured with non-empty string.

...

On client side, we add a new config called `group.instance.id` in ConsumerConfig. On consumer service init, if the `group.instance.id` config is set, we will put it in the initial join group request to identify itself as a static member. Note that it is user's responsibility to assign unique `group.instance.id` for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate `group.instance.id` in case client configured it wrongconfiguration contains duplicates.

For the effectiveness of the KIP, consumer with `group.instance.id` set will not send leave group request when they go offline, which means we shall only rely on session.timeout to trigger group rebalance. It is because the proposed rebalance protocol will trigger rebalance with this intermittent in-and-out which is not ideal. In static membership we leverage the consumer group health management to client application such as K8k8s. Therefore, it is also advised to make the session timeout large enough so that broker side will not trigger rebalance too frequently due to member come and go. By having a handful admin tool, user could proactively remove members if session timeout is too long in runtime.

...

KStream uses stream thread as consumer unit. For a stream instance configured with `num.threads` = 16, there would be 16 main consumers running on a single instance. If user specifies the client id, the stream consumer client id will be like: User client id + "-StreamThread-" + thread id + "-consumer"If user client id is not set, then we will use process id. Our plan is to reuse the consumer client id to define `group.instance.id`, so effectively the KStream instance will be able to use static membership if end user defines unique `client.id` for stream instances.

For easy operation, we defined define a new field in StreamsMetadata to expose all the `group.instance.id` given on each stream instance, so that user could use

  1. Use REST API to get

...

  1. list of `group.instance.id` on stream instances user wants to remove
  2. Shutdown targeting stream instances
  3. Use command line API to batch remove offline consumers

...

Server behavior changes

On server side, broker will keep handling join group request <= v3 as before. The `member.id` generation and assignment is still coordinated by broker, and broker will maintain an in-memory mapping of {group.instance.id → member.id} to track member uniqueness. When receiving a known member's (A.K.A `group.instance.id` known) rejoin request, broker will return the cached assignment back to the member, without doing any rebalance.

...

  • If the `member.id` uses UNKNOWN_MEMBER_ID, we shall always generate a new member id and replace the one within current map. We also expect that after KIP-394, all the join group requests are requiring `member.id` to physically enter the consumer group, so the behavior of static member is consistent with that proposal.
  • we are requiring member.id (if not unknown) to match the value stored in cache, otherwise reply MEMBER_ID_MISMATCH. The edge case is that if we could have members with the same `group.instance.id` (for example mis-configured instances with a valid member`member.id id` but added a used `group.instance.id` on runtime). When `group.instance.id` has duplicates, we could refuse join request from members with an outdated `member.id`, since we update the mapping upon each join group request. In an edge case where the client hits this exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce bug cases easily.

...

We will not plan to solve the scale up issue holistically within this KIP, since there is a parallel discussion about Incremental ncremental Cooperative Rebalancing, in which we will encode the "when to rebalance" logic at the application level, instead of at the protocol level. 

Just for recodFor initial scale up, there is a plan to deprecate group.initial.rebalance.delay.ms since we no longer needs it once static membership is delivered and the incremental rebalancing work is done.

...

Currently broker accepts a config value called rebalance timeout which is provided by consumer max.poll.intervals. The reason we set it to poll interval is because consumer could only send request within the call of poll() and we want to wait sufficient time for the join group request. When reaching rebalance timeout, the group will move towards COMPLETING_REBALANCE stage and remove unjoined members. This is actually conflicting with the design of static membership, because those temporarily unavailable members will potentially reattempt the join group and trigger extra rebalances. Internally we would optimize this logic by having rebalance timeout only in charge of stopping prepare rebalance PREPARE_REBALANCE stage, without removing non-responsive members immediately. There would not be a full rebalance if the lagging consumer sent sends a JoinGroup request JoinGroupRequest within the session timeout.

...

RemoveMemberFromGroup (introduced above) will remove given instances and trigger rebalance immediately on static membership, which is mainly used for fast scale down/host replacement cases (we detect consumer failure faster than the session timeout). This API will first send a FindCoordinatorRequest to locate the target correct broker, and initiate a LeaveGroupRequest to target broker hosting that coordinator, and the coordinator will decide whether to take this metadata change request based on its status on runtime.

Error will be returned if

  1. the The broker is on an old version.
  2. Consumer group does not exist.
  3. Operator is not authorized. (neither admin nor consumer group creater)
  4. if If the group is not in a valid state to transit to rebalance. (use `canRebalance` function defined in GroupMetadata.scala to check)
  5. Some instance ids are not found, which means the request is not valid.

We need to enforce special access to these APIs to for the end user who may not be in administrative role of Kafka Cluster. The solution is to allow a similar access level to the join group request, so the consumer service owner could easily use this API.

...

  1. Upgrade your broker to include this KIP.
  2. Upgrade your client to include this KIP.
  3. Set `group.instance.id` and session timeout to a reasonable number , and rolling bounce your if necessary
    1. For KStream user set `client.id` for Stream instances should do the work
  4. Rolling bounce the consumer group.

That's it! We believe that the static membership logic is compatible with the current dynamic membership, which means it is allowed to have static members and dynamic members co-exist within the same consumer group. This assumption could be further verified when we do some modeling of the protocol and unit test.  For KStream user set `client.id` for Stream instances in step 3 should do the work.

Downgrade process

The downgrade process is also straightforward. End user could just unset just:

  1. Unset `group.instance.id`

...

  1. , and change the session timeout to a smaller value if necessary
    1. For KStream user, unset `client.id` should do the work
  2. Do a rolling bounce to switch back to dynamic membership

 The . The static membership metadata stored on broker will eventually wipe be wiped out when the corresponding `member.id` is reaches session timeout. 

One more tip is that, user may want to change the session timeout to a smaller value, since the liveness is more important now.

Non-goal

We do have some offline discussions on handling leader rejoin case, for example since the broker could also do the subscription monitoring work, we don't actually need to trigger rebalance on leader side blindly based on its rejoin request. However this is a separate topic from 345 and we will could address it in another KIP. 

...

Beyond static membership we could unblock many interactive use cases between broker and consumerstability features. We will initiate separate discussion threads once 345 is done.  Examples are:

...