Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Status

Current state: DraftIn review

Discussion thread: TBD

JIRA: here

...

For stateful applications, one of the biggest performance bottleneck is the state shuffling. In Kafka consumer, there is a concept called "rebalance" which means that for given M partitions and N consumers in one consumer group, Kafka will try to balance the load between consumers and ideally have each consumer dealing with M/N partitions. Broker will also adjust the workload dynamically by monitoring consumers' health so that we could kick dead consumer out of the group, and handling new consumers' join group request. The intuition of this design is to avoid processing hot spot and maintain fairness plus liveness of the whole application. However, when the service state is heavy, a rebalance of one topic partition from instance A to B means huge amount of data transfer. If multiple rebalances are triggered, the whole service could take a very long time to recover due to data transfer. 

...

  • When a new member joins the consumer group, if this is a new member or the group leader, the broker will move this group state from RUNNING to PREPARE_REBALANCE. The reason for triggering rebalance when leader joins is because there might be assignment protocol change. If an old member rejoins the group, the state will not change to PREPARE_REBALANCE.
  • When moved to PREPARE_REBALANCE state, the broker will mark first joined consumer as leader, and wait for all the members to rejoin the group. Once we collected enough number of consumers/ reached rebalance timeout, we will reply the leader with current member information and move the state to SYNC. All current members are informed to send SyncGroupRequest to get the final assignment.
  • The leader consumer will decide the assignment and send it back to broker. As last step, broker will announce the new assignment by sending SyncGroupResponse to all the followers. Till now we finished one rebalance and the group generation is incremented by 1.

In the current architecture, during each rebalance consumer group on broker side will assign new member id with a UUID randomly generated each time. This is to make sure we have unique identity for each group member. On the current During client siderestart, consumer will send a JoinGroupRequest with a special UNKNOWN_MEMBER id, which has no intention to be treated as an existing member.  To make the KIP work, we need to change both client side and server side logic to make sure we persist member id throughout restarts, which means we could reduce number of rebalances since we are able to apply the same assignment based on member identities.

...

On client side, we add a new config called MEMBER_ID in ConsumerConfig. On consumer service init, if the MEMBER_ID config is set, we will put it in the initial join group request; otherwise, we will still send UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID. To distinguish between previous version of protocol, we will also increase the join group request version to v3 v4 when MEMBER_ID is set. 

Code Block
languagejava
titleConsumerConfig.java
public static final STRING MEMBER_ID; // default empty String

...

Code Block
languagejava
titleJoinGroupRequest.java
public static Schema[] schemaVersions() {
    return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2, JOIN_GROUP_REQUEST_V3, JOIN_GROUP_REQUEST_V4};
}


On server side, broker will keep handling join group request <= v2 v3 as before. If the protocol version is upgraded to v3v4 and the member id is set, the broker will no longer will use the member id specified in the join group request.  Otherwise broker will use the client id plus random generated member id suffix as the member id. Instead, server will use the member id specified in the join group request v3, same as now. The change will be applied in addMemberAndRebalance. 

In the join request v3v4, it is user's responsibility to assign unique member id for each consumers. This could be in service discovery hostname, unique IP address, etc. The downside is that if user configured the member id wrongly, there could be multiple consumers with the same member id, which invalidates the consumption balance and triggers unpredictable buggy behaviors. We could think of basic validation here, but considering this is an advanced config, we will only rely on user providing unique member id in this KIP. If the MEMBER_ID config is not set, the consumer client be using join group request v2 as before.

Compatibility, Deprecation, and Migration Plan

...

In this pull request, we did an experimental approach to materialize member id on the instance local disk. This approach could reduce the rebalances as expected, but which is the experimental foundation of KIP 345. However, KIP 345 has a few advantages over it:

...