Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
Consumer {
   /**
     * Alert the consumer to trigger a new rebalance by rejoining the group. You should not need to call this during
     * normal processing, as the consumer group will manage itself automatically and rebalance when necessary. However
     * there may be situations where the application wishes to trigger a rebalance that would otherwise not occur,. forFor
     * example, if some condition has changed that external and invisible to the Consumer and its group membership changes in a way that
	 * has implications for the partition assignment, this method can be used to trigger a reassignment. The kind of
	 * system change that would merit calling this method is one that would also be reflected in the Subscription 
	 * userdata that is passed to the assignor so it can make assignment decisions based on more than group membership
	 * and metadata. If your assignor does not use this userdata, or you do not use a custom assignor, you should not
	 * use this API.
     * <p>
     * If a rebalance is already in progress, an exception will be thrown so you can choose to retry or not once the
     * current rebalance completes.
     *
     * @param timeout The maximum amount of time to wait for the rebalance
     *
     * @return Whether the rebalance completed within the given timeout.
     * @throws java.lang.IllegalStateException if the consumer does not use group subscription
     * @throws org.apache.kafka.common.errors.RebalanceInProgressException if a rebalance is already in progress
     * @throws org.apache.kafka.common.KafkaException if the rebalance callback throws an exception
     */
    boolean enforceRebalance(Duration timeout) {
}

...

Compatibility, Deprecation, and Migration Plan

N/AWe will be adding a new method to the Consumer interface, and as such any new implementations should override this method. No default implementation will be provided so existing Consumer implementations that wish to upgrade should be extended and recompiled.

Rejected Alternatives

Instead of adding this directly to the consumer client, we could have added this to the admin client as in

...

Another rejected idea was to make this a non-blocking call: this is how Streams will most likely want to use the API, so it can continue polling/processing other tasks while the rebalance progresses, but this can still be achieved by just setting the timeout to zero. 

Furthermore, some use cases (such as the current design plan of KIP-441) will involve calling this new API at some regular, periodic basis. For those cases a more straightforward solution might be to simply add a "rebalance.interval" config that allows users to specify some interval at which to automatically trigger periodic rebalances. However this does not fit all use cases: some, such as version probing or a more advanced, heuristic-driven KIP-441 design, require triggering of a rebalance in response to specific system changes. The chosen design is intended to allow users to trigger a reassignment based on conditions external/unknown to the consumer (and not for example as a way to resolve temporary imbalances due to membership changes).