Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
Consumer {
   /**
     * Alert the consumer to trigger a new rebalance by rejoining the group. This is a nonblocking call that forces
	     * the consumer to trigger a new rebalance on the next poll{@link #poll(Duration)} call. Note that this API does not
     * itself initiate
	 * the rebalance, so you must still call poll {@link #poll(Duration)}. If a rebalance is already in progress
 the method will return
	 * falseprogress tothis indicatecall thatwill it wasbe a no-op. YouIf canyou use thiswish to decideforce whether to just complete the current
	 *an additional rebalance oryou retrymust in casecomplete the current rebalance does not include the latest subscription metadata; however,
	
     * inone mostby casescalling itpoll isbefore betterretrying to determine that based on the resulting assignment of the current rebalance. Only
	 * when it is not possible to validate whether the latest metadata was used based on the received assignment
	 * should you consider retrying based on the return value. 
	 * <p>
	this API.
     * <p>
     * You shoulddo not need to call this during normal processing, as the consumer group will manage itself
	     * automatically and rebalance when necessary. However there may be situations where the application wishes to
	     * trigger a rebalance that would otherwise not occur. For example, if some condition external and invisible to
	     * the Consumer and its group membership changes in a way that haswould affect implicationsthe foruserdata theencoded partitionin assignment,the
	 * this method can be* used to trigger a reassignment. The kind of system change that would merit calling this
 	 * method is one that would also be reflected in the Subscription userdata that is passed to{@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription Subscription}, the Consumer
     * will not be notified and no rebalance will occur. This API can be used to force the group to rebalance so that
     * the assignor socan it
	perform *a canpartition makereassignment assignment decisions based on morethe than group membership and metadatalatest userdata. If your assignor does not use
	     * this userdata, or you do not use a custom
 assignor, you should not use* this API.
     *{@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor ConsumerPartitionAssignor}, you should not
     * @returnuse false if a rebalance is already in progressthis API.
     *
     * @throws java.lang.IllegalStateException if the consumer does not use group subscription
     */
    booleanvoid enforceRebalance() {
}

Proposed Changes

...

Clearly, the behavior of this API in the case on an ongoing rebalance is application-dependent. We should return "false" to alert the user and allow them to choose how to handle this case.However it seems best to leave it up to the user to determine whether or not to retry based on the results of the completed rebalance by checking the assignment received. 

The consumer is not part of an active group

...

Code Block
languagejava
titleMyApp.java
mainProcessingLoop() {
	if (justCompletedInitialization) {
		needsRebalance = true;
		justCompletedInitialization = false;
	}

	if (needsRebalance) {
		if (consumer.enforceRebalance()) {
			// successfully triggered a new rebalance, so the assignor will receive the latest metadata 
			needsRebalance = false;
		} else {
			// must complete the current rebalance then retry to trigger a rebalance with the latest metadata
		}
	}
	
	records = consumer.poll();}
	
	records = consumer.poll();

	// check the assignment in case you need to retry, eg if a rebalance was already in progress was enforceRebalance was called
	if (receivedFinalAssignment()) {
		needsRebalance = false;
	}
	...
	// do something
}

Compatibility, Deprecation, and Migration Plan

...