...
In this KIP, we propose to change this behavior as well: when slightly modify the semantics of the listener, such that the callback is used more like a "notification" to users about what gets assigned and what gets revoked, etc, and even exceptions gets thrown, it would not cause any of these assignment / revocation results to be changed. Also, if the ConsumerCoordinator found out that there's no need to notify since no "new" partitions have been assigned or revoked, the corresponding callback would not be triggered either.
More specifically, we will change this behavior as well:
- ConsumerCoordinator will check if the newly assigned / revoked / lost partitions set is empty or not; and if not, we will not trigger the corresponding listener.
- When the listener callback throws an exception, ConsumerCoordinator will log an error, and then re-throws this exception all the way up to KafkaConsumer.poll()
...
- .
- Note that with the new rebalance protocol, onPartitionsAssigned / onPartitionsRevoked may be called sequentially. If one throws an exception, we would still proceed to complete the rest of the callbacks while remembering the "first-thrown exception", and then at the end throws the remembered exception to the user.
Upon capturing the error, users can do the following depend on their exception handling logic:
- Shutdown the consumer gracefully if the exception is considered a fatal error.
- Retry consumer.poll() if they believe the exception is re-triable, be aware that it will be treated as if no side-effects are taken at all from the exception-thrown callback: in other words, the callback is used more like a "notification" to users about what gets assigned and what gets revoked, and throwing exception would not cause any of these results to be changed.
To give a concrete example, suppose your current assigned partitions are {1,2}, and the newly assigned partitions are {2,3}, the consumer will call onPartitionsAssigned(3) and then onPartitionsRevoked(1). Suppose the former failed with an exception, ConsumerCoordinator would still proceed to complete the latter callback (and assume the latter callback succeeds), and if users decide to retry, it is still considered as successfully changed to {12, 23} – i.e. we consider all of the effects user indicated in the callback have taken place.
...
From the user's perspective, the upgrade path of leveraging new protocols is just the same as switching to a new assignor. For example, assuming the current version of Kafka consumer is 2.2 and "range" assignor is specified in the config. The upgrade path would be:
...
The key point behind this two rolling bounce is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce.
...
For example, this can also help saving "version probing" cost on Streams as well: suppose we augment the join group schema with `protocol version` in Kafka version 2.3, and then with both brokers and clients being in version 2.3+, on the first rolling bounce where subscription and assignment schema and / or user metadata has changed, this protocol version will be bumped. On the broker side, when receiving all member's join-group request, it will choose the one that has the highest protocol version (also it assumes higher versioned protocol is always backward compatible, i.e. the coordinator can recognize lower versioned protocol as well) and select it as the leader. Then the leader can decide, based on its received and deserialized subscription information, how to assign partitions and how to encode the assignment accordingly so that everyone can understand it. With this, in Streams for example, no version probing would be needed since we are guaranteed the leader knows everyone's version -- again it is assuming that higher versioned protocol is always backward compatible -- and hence can successfully do the assignment at that round.
Edge Cases Discussion
This proposal depends on user's correct behavior that when upgrading to the new version everyone is still using "eager" protocol. But if user mades mistakes, it is still not going to fall into undefined behavior, as the assignor mechanism will guarantee that we must form a consensus on the protocol names, and whoever does not support the chosen one will be kicked out of the group and hence users would be notified about the mis-configure.
There's a few edge cases to illustrate this:
Changing protocol without changing assignor
If user mistakenly configured the rebalance.protocol to "cooperative" but did not change the assignor. Then if assignor does not support "cooperative" it will simply fail at starting up; if the assignor supports "cooperative" it will still have conflict with other members who are still on "eager", i.e no consensus assignor will be selected and members being kicked with a fatal error.
Downgrading and Old-Versioned New Member
If a consumer is downgraded incorrectly after the above upgrade path is complete: i.e. it just replaced with the old jar without changing any configs, it is treated as first leaving the group, and then rejoining the group as an new member with "eager". This situation can also be reflected when a new member with "eager" is joining a group (probably mistakenly) whereas everyone else have been switched to "cooperative".
...