Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this KIP, we propose to change this behavior as well: when slightly modify the semantics of the listener, such that the callback is used more like a "notification" to users about what gets assigned and what gets revoked, etc, and even exceptions gets thrown, it would not cause any of these assignment / revocation results to be changed. Also, if the ConsumerCoordinator found out that there's no need to notify since no "new" partitions have been assigned or revoked, the corresponding callback would not be triggered either.

More specifically, we will change this behavior as well:

  1. ConsumerCoordinator will check if the newly assigned / revoked / lost partitions set is empty or not; and if not, we will not trigger the corresponding listener.
  2. When the listener callback throws an exception, ConsumerCoordinator will log an error, and then re-throws this exception all the way up to KafkaConsumer.poll()

...

  1. .
    1. Note that with the new rebalance protocol, onPartitionsAssigned / onPartitionsRevoked may be called sequentially. If one throws an exception, we would still proceed to complete the rest of the callbacks while remembering the "first-thrown exception", and then at the end throws the remembered exception to the user.

Upon capturing the error, users can do the following depend on their exception handling logic:

  • Shutdown the consumer gracefully if the exception is considered a fatal error.
  • Retry consumer.poll() if they believe the exception is re-triable, be aware that it will be treated as if no side-effects are taken at all from the exception-thrown callback: in other words, the callback is used more like a "notification" to users about what gets assigned and what gets revoked, and throwing exception would not cause any of these results to be changed.


To give a concrete example, suppose your current assigned partitions are {1,2}, and the newly assigned partitions are {2,3}, the consumer will call onPartitionsAssigned(3) and then onPartitionsRevoked(1). Suppose the former failed with an exception, ConsumerCoordinator would still proceed to complete the latter callback (and assume the latter callback succeeds), and if users decide to retry, it is still considered as successfully changed to {12, 23} – i.e. we consider all of the effects user indicated in the callback have taken place.

...


From the user's perspective, the upgrade path of leveraging new protocols is just the same as switching to a new assignor. For example, assuming the current version of Kafka consumer is 2.2 and "range" assignor is specified in the config. The upgrade path would be:

...

  • Having a first rolling bounce to replace the byte code (i.e. swap the jars); set the assignors to "range, sticky". At this stage, the new versioned byte code will still choose EAGER as the protocol and then sends both assignors in their join-group request, since there are at least one member who's not bounced yet and therefor will only send with "range", "range" assignor will be selected to assign partitions while everyone is following the EAGER protocol. This rolling bounce is safe.
  • Having a second rolling bounce to remove the "range" assignor, i.e. only leave the "sticky" assignor in the config. At this stage, whoever have been bounced will then choose COOPERATIVE protocol and not revoke partitions while others not-yet-bounced will still go with EAGER and revoke everything. However the "sticky" assignor will be chosen since at least one member who's already bounced will not have "range" any more. The "sticky" assignor works even when there are some members in EAGER and some members in COOPERATIVE: it is fine as long as the leader can recognize them and make assignment choice accordingly, and for EAGER members, they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, hence it is safe just to move those partitions to other members immediately based on the assignor's output.
  • The key point behind this two rolling bounce is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce.

    ...

    For example, this can also help saving "version probing" cost on Streams as well: suppose we augment the join group schema with `protocol version` in Kafka version 2.3, and then with both brokers and clients being in version 2.3+, on the first rolling bounce where subscription and assignment schema and / or user metadata has changed, this protocol version will be bumped. On the broker side, when receiving all member's join-group request, it will choose the one that has the highest protocol version (also it assumes higher versioned protocol is always backward compatible, i.e. the coordinator can recognize lower versioned protocol as well) and select it as the leader. Then the leader can decide, based on its received and deserialized subscription information, how to assign partitions and how to encode the assignment accordingly so that everyone can understand it. With this, in Streams for example, no version probing would be needed since we are guaranteed the leader knows everyone's version -- again it is assuming that higher versioned protocol is always backward compatible -- and hence can successfully do the assignment at that round.


    Edge Cases Discussion

    This proposal depends on user's correct behavior that when upgrading to the new version everyone is still using "eager" protocol. But if user mades mistakes, it is still not going to fall into undefined behavior, as the assignor mechanism will guarantee that we must form a consensus on the protocol names, and whoever does not support the chosen one will be kicked out of the group and hence users would be notified about the mis-configure.

    There's a few edge cases to illustrate this:

    Changing protocol without changing assignor

    If user mistakenly configured the rebalance.protocol to "cooperative" but did not change the assignor. Then if assignor does not support "cooperative" it will simply fail at starting up; if the assignor supports "cooperative" it will still have conflict with other members who are still on "eager", i.e no consensus assignor will be selected and members being kicked with a fatal error.

    Downgrading and Old-Versioned New Member

    If a consumer is downgraded incorrectly after the above upgrade path is complete: i.e. it just replaced with the old jar without changing any configs, it is treated as first leaving the group, and then rejoining the group as an new member with "eager". This situation can also be reflected when a new member with "eager" is joining a group (probably mistakenly) whereas everyone else have been switched to "cooperative". 

    ...