Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For stateful applications, one of the biggest performance bottleneck is the state shuffling. In Kafka consumer, there is a concept called "rebalance" which means that for given M partitions and N consumers in one consumer group, Kafka will try to balance the load between consumers and ideally have each consumer dealing with M/N partitions. Broker will also adjust the workload dynamically by monitoring consumers' health so that we could kick dead consumer out of the group, and handling new consumers' join group request. The intuition of this design is to avoid processing hot spot and maintain fairness plus liveness of the whole application. However, when the When the service state is heavy, a rebalance of one topic partition from instance A to B means huge amount of data transfer. If multiple rebalances are triggered, the whole service could take a very long time to recover due to data transfer. 

The idea of this KIP is to reduce number of rebalances by introducing a new concept : called static membership. It would help with following example use cases.

...

  • Static Membership: the membership protocol where the consumer group will not trigger rebalance unless 1. a new member joins 2. a leader rejoins. 3. an existing member go offline over session timeout.
  • Member Group member name: the unique identifier defined by user to distinguish each client instance.

...

New Configurations

Consumer configs

group.member.name

The unique identifier of the consumer instance provided by end user.

Default value: empty string.

Client side changes

The new `member`group.member.name` config will be added to the join group request.

Code Block
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId MemberNameGroupMemberName ProtocolType GroupProtocols
  GroupId             => String
  SessionTimeout      => int32
  RebalanceTimeout	  => int32
  MemberId            => String
  MemberNameGroupMemberName   		  => String // new
  ProtocolType        => String
  GroupProtocols      => [Protocol MemberMetadata]
  Protocol            => String
  MemberMetadata      => bytes

...

Code Block
languagejava
titleErrors.java
MEMBER_ID_MISMATCH(78, "The join group contains group member name which is already in the consumer group, however the member id was not matching the record on coordinator", MemeberIdMisMatchException::new),

...

For fault-tolerance, we also include group member name within the member metadata to backup in the offset topic.

...

  1. JoinGroupRequest V4 is supported on both client and broker
  2. `member`group.member.name` is configured with non-empty string.

...

On client side, we add a new config called MEMBER_NAME in ConsumerConfig. On consumer service init, if the MEMBER_NAME config is set, we will put it in the initial join group request to identify itself as a static member (static membership); otherwise, we will still send UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID (dynamic membership). Note that it is user's responsibility to assign unique member id for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate `member`group.member.name` in case client configured it wrong.

For the effectiveness of the KIP, consumer with `memberwith `group.member.name` set will not send leave group request when they go offline, which means we shall only rely on session.timeout to trigger group rebalance. It is because the proposed rebalance protocol will trigger rebalance with this intermittent in-and-out which is not ideal. In static membership we leverage the consumer group health management to client application such as K8. Therefore, it is also advised to make the session timeout large enough so that broker side will not trigger rebalance too frequently due to member come and go.

...

On server side, broker will keep handling join group request <= v3 as before. If the protocol version is upgraded to v4 and the `group.member name .name` is set, the broker will use the member name specified in the join group request and respond with a unique "member id".  Broker will maintain an in-memory mapping of {group.member.name → member.id} to track member uniqueness. When receiving an existing member's rejoin request, broker will return the cached assignment back to the member, without doing any rebalance.

For join group requests under static membership (with `group.member name .name` set),

  • If the `member.id` uses UNKNOWN_MEMBER_NAME, we shall return the member id stored in the current map if `member`group.member.name` is known. Also once we are done with KIP-394, all the join group requests are requiring `member.id` to physically enter the consumer group. 
  • we are requiring member.id (if not unknown) to match the value stored in cache, otherwise reply MEMBER_ID_MISMATCH. The edge case that if we could have members with the same `member`group.member.name` (for example mis-configured instances with a valid member.id but added a used `group.member name .name` on runtime). When `group.member name .name` has duplicates, we could refuse join request from members with an outdated `member.id` (since we update the mapping upon each join group request). In an edge case where the client hits this exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce bug cases easily.

For join group requests under dynamic membership (without `group.member name .name` set), the handling logic will remain unchanged. If the broker version is not the latest (< v4), the join group request shall be downgraded to v3 without setting the member Id.

...

To make sure we could recover from broker failure/leader transition, an in-memory `group.member name .name` map is not enough. We would reuse the `_consumer_offsets` topic to store the static member map information. When another broker takes over the leadership, we could transfer the mapping together. 

...

  1. Upgrade your broker to include this KIP.
  2. Upgrade your client to include this KIP.
  3. Set `group.member name .name` and session timeout to a reasonable number, and rolling bounce your consumer group.

...

In this pull request, we did an experimental approach to materialize member id(the identity given by broker, equivalent to the `member`group.member.name` in proposal) on the instance local disk. This approach could reduce the rebalances as expected, which is the experimental foundation of KIP-345. However, KIP-345 has a few advantages over it:

  1. It gives users more control of their `group.member name.name`; this would help for debugging purposes.
  2. It is more cloud-/k8s-and-alike-friendly: when we move an instance from one container to another, we can copy the `group.member name .name` to the config files.
  3. It doe not require the consumer to be able to access another dir on the local disks (think your consumers are deployed on AWS with remote disks mounted).
  4. By allowing consumers to optionally specifying a `group.member name.name`, this rebalance benefit can be easily migrated to connect and streams as well which relies on consumers, even in a cloud environment.

...

  1. Pre-registration (Proposed by Jason). Client user could provide a list of hard-coded `member`group.member.name` so that the server could respond to scaling operations more intelligently. For example when we scale up the fleet by defining 4 new client member names, the server shall wait until all 4 new members to join the group before kicking out the rebalance, same with scale down.
  2. Add hot standby hosts by defining `target.group.size` (proposed by Mayuresh). We shall keep some idle consumers within the group and when one of the active member go offline, we shall trigger hot swap due to the fact that current group size is smaller than `target.group.size`. With this change we might even not need to extend the session timeout since we should easily use the spare consumer to start working. 

...