Status

Current state: Draft

Discussion thread: TBD

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

For stateful applications, one of the biggest performance bottleneck is the state shuffling. In Kafka consumer, there is concept called "rebalance" which means that for given M partitions and N consumers in one consumer group, Kafka will try to balance the load between consumers and ideally have each consumer dealing with M/N partitions. Broker will also adjust the workload dynamically by monitoring consumers' health and handling new consumer join request. The intuition is to avoid processing hot spot and maintain fairness plus liveness of the whole application. However, when the service state is heavy, a rebalance of one partition from instance A to B means huge amount of data transfer. If multiple rebalances are triggered, the whole service could take a very long time to recover. 

The idea of this KIP is to reduce number of rebalances by specifying the consumer member id. Core argument: If the broker recognize this consumer as an existing member, it shouldn't trigger rebalance. The only exception is when leader rejoins the group, because there might be assignment protocol change that needs to be enforced.

Public Interfaces

Right now broker handles consumer state in a two-phase protocol. To be concise here, we only discuss 3 involving states here: RUNNING, PREPARE_REBALANCE and SYNC.

In the current architecture, during each rebalance consumer group on broker side will assign new member id with a UUID randomly generated each time. So through restarts, the consumer uses different member ids, which means the rejoin of same consumer will always be treated as "new member". On the client side, consumer will send a JoinGroupRequest with a special UNKNOWN_MEMBER id, which also has no intention to be treated as an existing member.  To make the KIP work, we need to change both client side and server side.

Proposed Changes

On client side, we add a new config called MEMBER_ID in ConsumerConfig. On consumer service init, if the MEMBER_ID config is not set, we shall use the 

def generateMemberIdSuffix = UUID.randomUUID().toString

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

In this pull request, we did an experimental approach to materialize member id on the instance local disk. This approach could reduce the rebalances as expected, but KIP 345 has a few advantages over it:

  1. It gives users more control of their member id string; this would help for debugging purposes.
  2. It is more cloud-/k8s-and-alike-friendly: when we move an instance from one container to another, we can copy the member id to the config files.
  3. It doe not require the brokers to be able to access another dir on the local disks (think your brokers is deployed on AWS with remote disks mounted).
  4. By allowing consumers to optionally specifying a member id, this rebalance benefit can be easily migrated to connect and streams as well which relies on consumers, even in a cloud environment.