Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Consumer group rebalances are generally intended to be abstracted away from the user as much as possible, but more advanced users of the Consumer client may actually want to explicitly trigger a rebalance at times. For example, Streams may do this if it has detected through some other means that one of its members has dropped out of the group and needs to rejoin right away. As of KIP-268 , it also relies on triggering a rebalance to provide a smoother upgrade path through its version probing protocol. And lastly, in the recently-approved KIP-441 we intend to improve the availability of stateful tasks by letting new instances warm up standbys. To probe for sufficiently caught-up standbys and transfer the active task assignment to them, Streams will once again leverage the ability to trigger a rebalance. 

...

Rebalancing delays and unavailability has long been a major pain point to Streams applications. We hope to finally mitigate this with the combination of KIP-345, KIP-429, and KIP-441 – but without the ability to explicitly trigger a rebalance we won't be able to fully realize the high availability potential of Streams.

Public Interfaces

Code Block
languagejava
Consumer {
	void rejoinGroup();
}

KafkaConsumer {
   /**
	* Alert the consumer to trigger a new rebalance by rejoining the group. If a rebalance is already in progress, an exception will be thrown
	* so you can choose to retry or not once the current rebalance completes.
	* 
	* @throws java.lang.IllegalStateException if the consumer does not use group subscription
    * @throws org.apache.kafka.common.errors.RebalanceInProgressException if a rebalance is already in progress
	* @throws org.apache.kafka.common.errors.CoordinatorNotAvailableException if the coordinator is unknown/unavailable
	* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. rebalance callback errors)
	*/ 
	public void rejoinGroup() {}
}

Proposed Changes

This KIP proposes to add an API that will immediately trigger a rebalance (as in Consumer#unsubscribe) without revoking all currently owned partitions. Both static and dynamic members will attempt to rejoin the group. The intended behavior is fairly straightforward, but there are some edge cases to consider:

A rebalance is already in progress

There is always the possibility that a rebalance is already in progress and the consumer is just awaiting its new assignment, or waiting for the remaining members to rejoin. It's also possible that an application may try to trigger a rebalance from multiple consumers in the group at once. The appropriate handling of this case depends on the application-level logic and reason for forcing a rebalance to begin with: for example, the current plan for KIP-441 is for the leader to periodically trigger a rebalance to check for ready standbys and give them an active assignment. If the rebalance is triggered based on an arbitrary time interval we should just allow the current rebalance to complete and move on, since the ongoing rebalance will collect the latest metadata just as the enforced rebalance would have.

...

Clearly, the behavior of this API in the case on an ongoing rebalance is application-dependent. We should throw a RebalanceInProgressException to alert the user and allow them to choose how to handle this case.

The consumer is not part of an active group

If the consumer is not part of an active group, either because it dropped out or it has not yet joined the group, we should already be attempting to rejoin. Continue on with triggering the rebalance.

The coordinator is unknown/unavailable

In this case we will just throw a CoordinatorNotAvailableException, so the user can retry (or not).

Compatibility, Deprecation, and Migration Plan

N/A

Rejected Alternatives

Instead of adding this directly to the consumer client, we could have added this to the admin client as in

...