You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

Status

Current stateVoting

Discussion thread: here

JIRA: KAFKA-9525

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Consumer group rebalances are generally intended to be abstracted away from the user as much as possible, but more advanced users of the Consumer client may actually want to explicitly trigger a rebalance at times. For example, Streams may do this if it has detected through some other means that one of its members has dropped out of the group and needs to rejoin right away. As of KIP-268 , it also relies on triggering a rebalance to provide a smoother upgrade path through its version probing protocol. And lastly, in the recently-approved KIP-441 we intend to improve the availability of stateful tasks by letting new instances warm up standbys. To probe for sufficiently caught-up standbys and transfer the active task assignment to them, Streams will once again leverage the ability to trigger a rebalance. 

For this design to be as effective as possible, Streams makes three assumptions about an enforced rebalance:

  1. it happens immediately
  2. it happens always
  3. it is quick, cheap, & low overhead

Currently the only way to enforce a rebalance in the absence of metadata/membership changes are by unsubscribing (and resubscribing) the consumer. Unfortunately this can violate assumptions #1 or #2, and always violates assumption #3. 

If static membership is used, unsubscribing will not trigger a rebalance as the member may not actually leave the group. If after some timeout the member has not rejoined, a rebalance will be triggered to redistribute its partitions (violating assumption #1). If it does rejoin within the timeout, it will be handed back its old assignment without requiring the entire group to rebalance (violating assumption #2). This makes static membership incompatible with version probing and the upcoming KIP-441. All three are crucial improvements to the operation and usability of Streams, and users should not be forced to make a choice between them.

If a rebalance is reliably triggered by calling #unsubscribe, it will cause all the owned partitions to be revoked (or lost). Prior to the improvements in KIP-429 this was always the case for any rebalance and assumption #3 was never true, regardless of who/what caused the rebalance. But incremental cooperative rebalancing in 2.4 now allows members to hold on to their owned partitions during a rebalance, and Streams can continue processing active tasks during a rebalance starting with 2.5. KIP-441 was specifically designed with this in mind, and builds off of this by trading additional rebalances for high availability. This is a good tradeoff if the rebalances are lightweight, but not so good if they actually contribute to overall unavailability as would be the case if calling unsubscribe.

Rebalancing delays and unavailability has long been a major pain point to Streams applications. We hope to finally mitigate this with the combination of KIP-345, KIP-429, and KIP-441 – but without the ability to explicitly trigger a rebalance we won't be able to fully realize the high availability potential of Streams.

Public Interfaces

Consumer {
   /**
     * Alert the consumer to trigger a new rebalance by rejoining the group. You should not need to call this during
     * normal processing, as the consumer group will manage itself automatically and rebalance when necessary. However
     * there may be situations where the application wishes to trigger a rebalance that would otherwise not occur. For
     * example, if some condition external and invisible to the Consumer and its group membership changes in a way that
	 * has implications for the partition assignment, this method can be used to trigger a reassignment. The kind of
	 * system change that would merit calling this method is one that would also be reflected in the Subscription 
	 * userdata that is passed to the assignor so it can make assignment decisions based on more than group membership
	 * and metadata. If your assignor does not use this userdata, or you do not use a custom assignor, you should not
	 * use this API.
     * <p>
     * If a rebalance is already in progress, an exception will be thrown so you can choose to retry or not once the
     * current rebalance completes.
     *
     * @param timeout The maximum amount of time to wait for the rebalance
     *
     * @return Whether the rebalance completed within the given timeout.
     * @throws java.lang.IllegalStateException if the consumer does not use group subscription
     * @throws org.apache.kafka.common.errors.RebalanceInProgressException if a rebalance is already in progress
     * @throws org.apache.kafka.common.KafkaException if the rebalance callback throws an exception
     */
    boolean enforceRebalance(Duration timeout) {
}

Proposed Changes

This KIP proposes to add an API that will immediately trigger a rebalance (as in Consumer#unsubscribe) without revoking all currently owned partitions. Both static and dynamic members will attempt to rejoin the group. The call will block up to the user-provided timeout, and return whether the rebalance completed within this time. If a "zero" timeout is specified, the consumer will initiate the rejoin and then return. In this or any other case where "false" is returned, the user must call poll to complete the rebalance.

The behavior is fairly straightforward overall, but there are some edge cases to consider:

A rebalance is already in progress

There is always the possibility that a rebalance is already in progress and the consumer is just awaiting its new assignment, or waiting for the remaining members to rejoin. It's also possible that an application may try to trigger a rebalance from multiple consumers in the group at once. The appropriate handling of this case depends on the application-level logic and reason for forcing a rebalance to begin with: for example, the current plan for KIP-441 is for the leader to periodically trigger a rebalance to check for ready standbys and give them an active assignment. If the rebalance is triggered based on an arbitrary time interval we should just allow the current rebalance to complete and move on, since the ongoing rebalance will collect the latest metadata just as the enforced rebalance would have.

But what if the rebalance was triggered not at some arbitrary point in time, but based on some just now satisfied condition? One alternative we considered for KIP-441 was to have each member trigger a rebalance once they finished preparing their standbys. If a rebalance was already in progress the subscription metadata would have been sent before this condition was met, and the resulting assignment would not take into account the readiness of the standby. If we just allowed the current rebalance to complete and did not trigger another one, the ready standby would not get converted to an active task.

Clearly, the behavior of this API in the case on an ongoing rebalance is application-dependent. We should throw a RebalanceInProgressException to alert the user and allow them to choose how to handle this case.

The consumer is not part of an active group

If the consumer is not part of an active group, either because it dropped out or it has not yet joined the group, we should already be attempting to rejoin. Continue on with triggering the rebalance.

The coordinator is unknown/unavailable

If we cannot connect to the coordinator within the timeout, this method will just return false. In this case it will not have initiated the join group, but will have marked the consumer as "needsRejoin" so the rebalance will be triggered on the next poll (or  `enforceRebalance` if retried)

Compatibility, Deprecation, and Migration Plan

We will be adding a new method to the Consumer interface, and as such any new implementations should override this method. No default implementation will be provided so existing Consumer implementations that wish to upgrade should be extended and recompiled.

Rejected Alternatives

Instead of adding this directly to the consumer client, we could have added this to the admin client as in

Admin {
	forceRebalance(String groupId);
}

On the one hand, triggering a rebalance seems like an administrative action and thus more appropriate for the admin client than the consumer. However this API is not intended to be an operational one, but instead a way for consumers to proactively communicate with the group and efficiently channel updates relevant to their assignment. I'm also not sure whether this would even be possible to implement without broker-side changes.

Another rejected idea was to make this a non-blocking call: this is how Streams will most likely want to use the API, so it can continue polling/processing other tasks while the rebalance progresses, but this can still be achieved by just setting the timeout to zero. 

Furthermore, some use cases (such as the current design plan of KIP-441) will involve calling this new API at some regular, periodic basis. For those cases a more straightforward solution might be to simply add a "rebalance.interval" config that allows users to specify some interval at which to automatically trigger periodic rebalances. However this does not fit all use cases: some, such as version probing or a more advanced, heuristic-driven KIP-441 design, require triggering of a rebalance in response to specific system changes. The chosen design is intended to allow users to trigger a reassignment based on conditions external/unknown to the consumer (and not for example as a way to resolve temporary imbalances due to membership changes).


  • No labels