Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
Consumer {
   /**
	     * Alert the consumer to trigger a new rebalance by rejoining the group. You should not need to call this during
     * normal processing,
	* as the consumer group will manage itself automatically and rebalance when necessary. However
     * there may be situations where 
	* the application wishes to trigger a rebalance that would otherwise not occur, for
     * example if some condition has changed that
	* has implications for the ideal partition assignment.
	     * <p>
	     * If a rebalance is already in progress, an exception will be thrown so you can choose to retry or not once the
     * current rebalance completes.
	* 
	
     *
     * @param timeout The maximum amount of time to wait for the rebalance
     *
     * @return Whether the rebalance completed within the given timeout.
     * @throws java.lang.IllegalStateException if the consumer does not use group subscription
     * @throws org.apache.kafka.common.errors.RebalanceInProgressException if a rebalance is already in progress
	     * @throws org.apache.kafka.common.errors.CoordinatorNotAvailableException if the coordinator is unknown/unavailable
	     * @throws org.apache.kafka.common.KafkaException if the rebalance callback throws an exception
	     */ 
	void
    boolean enforceRebalance(Duration timeout) {}
}

Proposed Changes

This KIP proposes to add an API that will immediately trigger a rebalance (as in Consumer#unsubscribe) without revoking all currently owned partitions. Both static and dynamic members will attempt to rejoin the group. The call will block up to the user-provided timeout, and return whether the rebalance completed within this time. If a "zero" timeout is specified, the consumer will initiate the rejoin and then return. In this or any other case where "false" is returned, the user must call poll to complete the rebalance.

The intended behavior is fairly straightforward overall, but there are some edge cases to consider:

...

On the one hand, triggering a rebalance seems like an administrative action and thus more appropriate for the admin client than the consumer. However this API is not intended to be an operational one, but instead a way for consumers to proactively communicate with the group and efficiently channel updates relevant to their assignment. I'm also not sure whether this would even be possible to implement without broker-side changes.

Another rejected idea was to make this a non-blocking call: this is how Streams will most likely want to use the API, so it can continue polling/processing other tasks while the rebalance progresses, but this can still be achieved by just setting the timeout to zero.