Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under DiscussionAccepted"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-12473

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Now that 3.0 is coming up, we can change the default ConsumerPartitionAssignor to something better than the RangeAssignor. The original plan was to switch over to the StickyAssignor, but now that we have incremental cooperative rebalancing, we should  consider using the new CooperativeStickyAssignor instead instead: this will enable the consumer group to follow the COOPERATIVE protocol, improving the rebalancing experience OOTB.

In Kafka, we currently support the following assignors:


Supported strategyFeatureDrawback
RangeAssignorEagerCurrent default value. The 1st assignor we havepossible to generate heavily skewed assignments when the consumer topic subscriptions are not identical
RoundRobinAssignorEagerImprovement for RangeAssignor to have balanced assignmentDidn't consider the overheads of reassignment
StickyAssignorEagerImprovement for RoundRobinAssignor/RangeAssignor to preserve the existing assignments to reduce some of the overheads of a reassignmentWill have stop-the-world issue when doing rebalance
CooperativeStickyAssignorEager, CooperativeTo be default value in 3.0 as in this KIP described, by having multiple rounds of rebalance to avoid the stop-the-world issue as described in KIP-429




As above table showed, we should change the default assignor now to enable cooperative rebalancing by default. However we must be careful to do so in a backwards compatible way that does not require any specific steps to be taken by the user for them to safely upgrade, as is currently the case for cooperative rebalancing, which requires a double rolling bounce. So in addition to changing the default partition assignor, we must also improve the rebalancing protocol selection mechanism. This will allow the consumer group to remain on the old protocol during the rolling upgrade, and only complete the upgrade to cooperative rebalancing once all members are safely on the new bytecode.

Public Interfaces

With this KIP, the default value of partition.assignment.strategy changes from "RangeAssignor"  to "CooperativeStickyAssignor"

  • It won't affect to the current consumers
  • New consumers with default partition.assignment.strategy will be affected
  • Upgrading the old consumer to the new consumer setting can refer to the following upgrade path section

Proposed Changes

  • Change the default setting of partition.assignment.strategy in implementation and tests
  • When changing the default assignor we need to make sure this is clearly documented in the upgrade guide for 3.0

Compatibility, Upgrade path

this will require users to follow the upgrade path laid out in KIP-429 to safely perform a rolling upgrade.

From the user's perspective, the upgrade path of leveraging new protocols is similar to switching to a new assignor. For example, assuming the current version of Kafka consumer is 2.2 and "range" assignor is specified in the config (or no assignor is configured, which is identical as the RangeAssignor is the default below 3.0). The upgrade path would be:

  1. The first rolling bounce is to replace the byte code (i.e. swap the jars) and introduce the cooperative assignor: set the assignors to "cooperative-sticky, range" (or round-robin/sticky/etc if you are using a different assignor already). At this stage, the new versioned byte code sends both assignors in their join-group request, but will still choose EAGER as the protocol since it's still configured with the "range" assignor, and the selected rebalancing protocol must be supported by all assignors. in the list. The "range" assignor will be selected to assign partitions while everyone is following the EAGER protocol. This rolling bounce is safe.
  2. The second rolling bounce is to remove the "range" (or round-robin/sticky/etc) assignor, i.e. only leave the "cooperative-sticky" assignor in the config. At this stage, whoever has been bounced will then choose the COOPERATIVE protocol and not revoke partitions while others not-yet-bounced will still go with EAGER and revoke everything. However the "cooperative-sticky" assignor will be chosen since at least one member who's already bounced will not have "range" any more. The "cooperative-sticky" assignor works even when there are some members in EAGER and some members in COOPERATIVE: it is fine as long as the leader can recognize them and make assignment choice accordingly, and for EAGER members, they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, hence it is safe just to move those partitions to other members immediately based on the assignor's output.

The key point behind this two rolling bounce is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce.

Rejected Alternatives

...

, RangeAssignor". Since the assignor list is ordered by preference, this means that the "CooperativeStickyAssignor" will become the new default partition assignor (once all members have been upgraded/used the new default).

However this in itself would not effect the rebalancing protocol, as this is chosen during startup as "the highest protocol which is commonly supported by all assignors". So in addition to changing the default assignor, this static rebalance protocol selection will be replaced by a dynamic protocol that depends on the actual assignor that was selected for use in the rebalance. This means that cooperative rebalancing will also become enabled by default.

Note that this change will also automatically be inherited by sink connectors, like any other application that uses Kafka consumers, as long as a consumer assignor is not explicitly defined in their configuration.

Proposed Changes:

If the consumers rely on the default value of partition.assignment.strategy, they will automatically enable cooperative rebalancing for new applications, and once all members of the group have been upgraded for existing ones. During the upgrade, it will continue to follow the EAGER protocol with the RangeAssignor. Once all members support the CooperativeStickyAssignor, this will be selected for use by the group coordinator, and reported back to each member. At this point the consumer is allowed to enable the COOPERATIVE protocol, and hold onto its owned partitions during a rebalance. 

Still, there are scenarios in which some members may be on COOPERATIVE while others in the group are not. For example if some member(s) on the old bytecode missed a rebalance, or the operator accidentally started up a consumer that only supports RangeAssignor, or during a downgrade. The danger in mixing EAGER and COOPERATIVE rebalancing, and the reason for the original double rolling bounce upgrade strategy, is that some partitions may be claimed by two consumers at once. This can occur if some members are using COOPERATIVE and don't revoke their partitions prior to the rebalance, while an EAGER-only assignor is chosen which will freely reassign those un-revoked partitions. At the heart of the dynamic protocol mechanism is the insight that the danger this situation presents is due to the possibility of both consumers committing offsets for that partition, not that both believe to be owners at the same time – in fact that situation may occur already, if a member has dropped out of the group. We therefore take a similar approach here: in the case of some members upgrading to COOPERATIVE but seeing an EAGER assignor be selected, they will invoke the onPartitionsLost callback and disable committing until they have rejoined the group.

With this proposal

  • New applications will enable cooperative rebalancing by default

  • Existing applications which don’t set an assignor can safely upgrade using a normal rolling bounce, and will automatically transition to cooperative rebalancing

  • Existing applications which do set an assignor that uses EAGER can likewise upgrade their applications to COOPERATIVE with a single rolling bounce (by adding the "cooperative-sticky" assignor to the beginning of the assignors list)

  • Once on new version (ex: V3.0), applications can safely go back and forth between EAGER and COOPERATIVE, based on the configured assignors

  • Applications can safely downgrade

Detailed implementation can be found in KAFKA-12477.

Compatibility, Upgrade path

No special upgrade path is necessary, and this change should be transparent to the user – they will just see that cooperative rebalancing is enabled.

Rejected Alternatives

  1. Some cooperative-sticky related defects might not freed before V3.0
    → We've marked important defects as blocker for V3.0, ex: KAFKA-12896. Please raise any important defect if you found any.
  2. cooperative-sticky assignor is also very new for C/C++ users in librdkafka, so not many in that community have tried incremental cooperative yet. And bugs are still recently being worked out there too.
    → I checked this library and found currently only 1 cooperative-sticky related bug open, which is good (10 bugs are fixed). Anyway, I think the clients can always change the assignor to other assignors if there are still bugs in the library.