Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Status

Current state: In review

Discussion thread: TBD

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

For stateful applications, one of the biggest performance bottleneck is the state shuffling. In Kafka consumer, there is a concept called "rebalance" which means that for given M partitions and N consumers in one consumer group, Kafka will try to balance the load between consumers and ideally have each consumer dealing with M/N partitions. Broker will also adjust the workload dynamically by monitoring consumers' health so that we could kick dead consumer out of the group, and handling new consumers' join group request. The intuition of this design is to avoid processing hot spot and maintain fairness plus liveness of the whole application. However, when the service state is heavy, a rebalance of one topic partition from instance A to B means huge amount of data transfer. If multiple rebalances are triggered, the whole service could take a very long time to recover due to data transfer. 

...

In the current architecture, during each rebalance consumer group on broker side will assign new member id with a UUID randomly generated each time. This is to make sure we have unique identity for each group member. During client restart, consumer will send a JoinGroupRequest with a special UNKNOWN_MEMBER id, which has no intention to be treated as an existing member.  To make the KIP work, we need to change both client side and server side logic to make sure we persist member id throughout restarts, which means we could reduce number of rebalances since we are able to apply the same assignment based on member identities.

Proposed Changes

On client side, we add a new config called MEMBER_ID in ConsumerConfig. On consumer service init, if the MEMBER_ID config is set, we will put it in the initial join group request; otherwise, we will still send UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID. To distinguish between previous version of protocol, we will also increase the join group request version to v4 when MEMBER_ID is set. 

...

Code Block
titleConsumerConfig.java
.defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
                Type.BOOLEAN,
                false,
                Importance.LOW)



Compatibility, Deprecation, and Migration Plan

  • This new change is an effort of reducing rebalances during consumer rolling restart. Since we introduced a new version of join group request, this should be a backward compatible change.

Non-goal

We do have some offline discussions on handling leader rejoin case, for example since the broker could also do the subscription monitoring work, we don't actually need to trigger rebalance on leader side blindly based on its rejoin request. However this is a separate topic and we will address it in another KIP. 

Rejected Alternatives

In this pull request, we did an experimental approach to materialize member id on the instance local disk. This approach could reduce the rebalances as expected, which is the experimental foundation of KIP 345. However, KIP 345 has a few advantages over it:

...