...
Discussion thread: here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||
---|---|---|
| ||
Consumer { /** * Alert the consumer to trigger a new rebalance by rejoining the group. This is a nonblocking call that forces * the consumer to trigger a new rebalance on the next poll{@link #poll(Duration)} call. Note that this API does not * itself initiate * the rebalance, so you must still call poll{@link #poll(Duration)}. If a rebalance is already in progress the method will return * falseprogress tothis indicatecall thatwill it wasbe a no-op. YouIf canyou usewish this to decideforce whether to just complete the current * rebalance or retry in case an additional rebalance you must complete the current rebalance does not include the latest subscription metadata; however, * inone mostby casescalling itpoll isbefore betterretrying to determine that based on the resulting assignment of the current rebalance. Only * when it is not possible to validate whether the latest metadata was used based on the received assignment * should you consider retrying based on the return value. * <p> * You shouldthis API. * <p> * You do not need to call this during normal processing, as the consumer group will manage itself * automatically and rebalance when necessary. However there may be situations where the application wishes to * trigger a rebalance that would otherwise not occur. For example, if some condition external and invisible to * the Consumer and its group membership changes in a way that haswould affect implicationsthe foruserdata theencoded partitionin assignment,the * this method can be* used to trigger a reassignment. The kind of system change that would merit calling this * method is one that would also be reflected in the Subscription userdata that is passed to{@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription Subscription}, the Consumer * will not be notified and no rebalance will occur. This API can be used to force the group to rebalance so that * the assignor socan it perform *a canpartition make assignmentreassignment decisions based on morethe than group membership and metadatalatest userdata. If your assignor does not use * this userdata, or you do not use a custom assignor, you should not use* this API. *{@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor ConsumerPartitionAssignor}, you should not * @returnuse falsethis ifAPI. a rebalance is already in progress* * @throws java.lang.IllegalStateException if the consumer does not use group subscription */ booleanvoid enforceRebalance() { } |
Proposed Changes
...
Clearly, the behavior of this API in the case on an ongoing rebalance is application-dependent. We should return "false" to alert the user and allow them to choose how to handle this case.However it seems best to leave it up to the user to determine whether or not to retry based on the results of the completed rebalance by checking the assignment received.
The consumer is not part of an active group
...
Code Block | ||||
---|---|---|---|---|
| ||||
mainProcessingLoop() { if (justCompletedInitialization) { needsRebalance = true; justCompletedInitialization = false; } if (needsRebalance) { if (consumer.enforceRebalance()) { // successfully triggered a new rebalance, so the assignor will receive the latest metadata needsRebalance = false; } else { // must complete the current rebalance then retry to trigger a rebalance with the latest metadata } } records = consumer.poll();} records = consumer.poll(); // check the assignment in case you need to retry, eg if a rebalance was already in progress was enforceRebalance was called if (receivedFinalAssignment()) { needsRebalance = false; } ... // do something } |
Compatibility, Deprecation, and Migration Plan
...