Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
Consumer {
   /**
     * Alert the consumer to trigger a new rebalance by rejoining the group. This is a blockingnonblocking call that will waitforces
	 * upthe consumer to trigger thea providednew timeoutrebalance foron the rebalancenext topoll completecall. If a itrebalance doesis notalready completein within this time you will
	 * have to call poll to complete the rebalance. Ifprogress, the
	 * method will return false to indicate that it was a no-op. The user can then decide to just complete the
	 * current rebalance, or retry this to trigger a rebalance iswith alreadythe in progress, an exception will bemost up-to-date subscription metadata. Note
	 * thrownthat sothis youAPI candoes choosenot whetheritself toinitiate triggerthe anrebalance, additionalso oneyou oncemust thestill currentcall rebalance completespoll.
	 * <p>
	 * You should not need to call this during normal processing, as the consumer group will manage itself
	 * automatically and rebalance when necessary. However there may be situations where the application wishes to
	 * trigger a rebalance that would otherwise not occur. For example, if some condition external and invisible to
	 * the Consumer and its group membership changes in a way that has implications for the partition assignment,
	 * this method can be used to trigger a reassignment. The kind of system change that would merit calling this
 	 * method is one that would also be reflected in the Subscription userdata that is passed to the assignor so it
	 * can make assignment decisions based on more than group membership and metadata. If your assignor does not use
	 * this userdata, or you do not use a custom assignor, you should not use this API.
     *
     * @param@return timeoutfalse Theif maximum amount of time to block waiting for the rebalance to complete.
     *a rebalance is already in progress
     * @return Whether the rebalance completed within the given timeout.
     * @throws java.lang.IllegalStateException if the consumer does not use group subscription
     * @throws org.apache.kafka.common.errors.RebalanceInProgressException if a rebalance is already in progress
     * @throws org.apache.kafka.common.KafkaException if the rebalance callback throws an exception
     */
    boolean enforceRebalance(Duration timeout) {
}

Proposed Changes

This KIP proposes to add an API that will immediately trigger a rebalance (as in Consumer#unsubscribe) request a rebalance without revoking all currently owned partitions. Both static and dynamic members will attempt to rejoin the group. The call will block up to the user-provided timeout, and return whether the rebalance completed within this time. If a "zero" timeout is specified, the consumer will initiate the rejoin and then return. In this or any other case where "false" is returned, the user must call poll to complete the rebalanceIt is a nonblocking call that does not itself initiate the rebalance (as in consumer.unsubscribe()) but instead just marks the consumer as needing to rejoin. A rebalance will then be triggered on the next poll() call.

The behavior is fairly straightforward overall, but there are some edge cases to consider:

...

Clearly, the behavior of this API in the case on an ongoing rebalance is application-dependent. We should throw a RebalanceInProgressException return "false" to alert the user and allow them to choose how to handle this case.

...

If the consumer is not part of an active group, either because it dropped out or it has not yet joined the group, we should already be attempting to rejoin. Continue on with triggering the rebalance.

The coordinator is unknown/unavailable

If we cannot connect to the coordinator within the timeout, this method will just return false. In this case it will not have initiated the join group, but will have marked the consumer as "needsRejoin" so the rebalance will be triggered on the next poll (or  `enforceRebalance` if retried)But if we are not yet in the REBALANCING state (above case), then we have not yet sent out the metadata and so the next rebalance should include whatever update caused the app to want to force a rebalance. In this case we just return "true".

example usage:

Let's say your app has some heavy initialization to do before it is ready to start processing partitions from certain topics (call it topic A), while other topics (topic B) can be processed right away. You want to avoid assigning any of the topic A partitions to a new member until it's ready to work on them, so your assignor will need to include whether the member is initialized or not in the userdata. This way the  assignor can make sure a member that has just joined will only receive partitions from topic B, allowing other members of the group to continue making progress on the partitions of topic A until the newer member is ready for them. In the example processing loop below, each member will check some system condition to determine whether it is ready to receive partitions from topic A and trigger a rebalance if so. By using this new API, in combination with a cooperative assignor, the app can actually continue to poll and process records while the rebalance goes on in the background.

Code Block
languagejava
titleMyApp.java
mainProcessingLoop() {
	if (justCompletedInitialization) {
		needsRebalance = true;
		justCompletedInitialization = false;
	}

	if (needsRebalance) {
		tryif (consumer.enforceRebalance()) {
			consumer.enforceRebalance(Duration.ZERO);// successfully triggered a new rebalance, so the assignor will receive the latest metadata 
			needsRebalance = false;
		} catchelse (RebalanceInProgressException) {
			// swallow andmust complete the current rebalance then retry on next loopto trigger a rebalance with the latest metadata
		}
	}
	
	records = consumer.poll();
	...
	// do some processingsomething
}

Compatibility, Deprecation, and Migration Plan

...

On the one hand, triggering a rebalance seems like an administrative action and thus more appropriate for the admin client than the consumer. However this API is not intended to be an operational one, but instead a way for consumers to proactively communicate with the group and efficiently channel updates relevant to their assignment. I'm also not sure whether this would even be possible to implement without broker-side changes.

Another rejected idea that was debated was whether to make this a non-blocking call: this is how Streams will most likely want to use the API, so it can continue polling/processing other tasks while the rebalance progresses, but this can still be achieved by just setting the timeout to zero. blocking call, where the consumer will actually initiate the rebalance (ie send out a JoinGroup) and then potentially wait up to some provided timeout for it to complete. On the surface this seems to offer the simpler approach: the rebalance will be triggered by this single API, and users can choose to wait on the rebalance to complete or fall back to a nonblocking call by setting the timeout to zero. However, the same thing can be achieved by just calling poll afterwards (with whatever timeout). This makes the enforceRebalance API much cleaner and easier to reason about, as all the edge case (eg coordinator unavailable/unknown) and error handling (for example if the rebalance callback throws an error) can remain part of poll, and the user need only worry about whether they want to retry in the case a rebalance was already ongoing. This also keeps the implementation clean, by keeping all actual rebalancing within the scope of poll and just setting a flag to rejoin.

Furthermore, some use cases (such as the current design plan of KIP-441) will involve calling this new API at some regular, periodic basis. For those cases a more straightforward solution might be to simply add a "rebalance.interval" config that allows users to specify some interval at which to automatically trigger periodic rebalances. However this does not fit all use cases: some, such as version probing or a more advanced, heuristic-driven KIP-441 design, require triggering of a rebalance in response to specific system changes. The chosen design is intended to allow users to trigger a reassignment based on conditions external/unknown to the consumer (and not for example as a way to resolve temporary imbalances due to membership changes).

...