Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For stateful applications, one of the biggest performance bottleneck is the state shuffling. In Kafka consumer, there is concept called "rebalance" which means that for given M partitions and N consumers in one consumer group, Kafka will try to balance the load between consumers and ideally have each consumer dealing with M/N partitions. Broker will also adjust the workload dynamically by monitoring consumers' health and handling new consumer join request. The intuition is to avoid processing hot spot and maintain fairness plus liveness of the whole application. However, when the service state is heavy, a rebalance of one partition from instance A to B means huge amount of data transfer. If multiple rebalances are triggered, the whole service could take a very long time to recover. 

The idea of this KIP is to reduce number of rebalances by specifying the consumer member id. Core argument: If the broker recognize this consumer as an existing member, it shouldn't trigger rebalance. The only exception is when leader rejoins the group, because there might be assignment protocol change that needs to be enforcedIt may hit multiple state rebalances when the client group is doing multiple rebalances.

Public Interfaces

Right now broker handles consumer state in a two-phase protocol. To be concise here, we only list discuss 3 involving states here: RUNNING, INPREPARE_REBALANCE and SYNC.

  • When a new member joins the consumer group, if the 

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

...

  • this is a new member or the group leader, the broker will move this group state from RUNNING to PREPARE_REBALANCE. The reason for triggering rebalance when leader joins is because there might be assignment protocol change.
  • When moved to PREPARE_REBALANCE state, the broker will mark first joined consumer as leader, and wait for all the members to rejoin the group. Once we collected enough number of consumers/ reached rebalance timeout, we will reply the leader with current member information and move the state to SYNC. All current members are informed to send SyncGroupRequest to get the final assignment.
  • The leader consumer will decide the assignment and send it back to broker. As last step, broker will announce the new assignment by sending SyncGroupResponse to all the followers. Till now we finished one rebalance and the group generation is incremented by 1.

In the current architecture, during each rebalance consumer group on broker side will assign new member id with a UUID randomly generated each time. So through restarts, the consumer uses different member ids, which means the rejoin of same consumer will always be treated as "new member". On the client side, consumer will send a JoinGroupRequest with a special UNKNOWN_MEMBER id, which also has no intention to be treated as an existing member.  To make the KIP work, we need to change both client side and server side.

Proposed Changes


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

  • This new change is an effort of reducing rebalances during consumer rolling restart. The change should be transparent on client side.

Rejected Alternatives

In this pull request, we did an experimental approach to materialize member id on the instance local disk. This approach could reduce the rebalances as expected, but is very hacky and has few drawbacks:If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.