Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Today, when a user customized rebalance listener callback throws an exception, as long as it is not WakeupException / InteruptException  InterruptedException it will be swallowed and logged with an error. This has been complained by our users that it was not an efficient way for notifying them when it happened (

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4600
).

In this KIP, we propose to change this behavior as well: when the listener callback throws an exception, ConsumerCoordinator will log an error, and then rethrows re-throws this exception all the way up to KafkaConsumer.poll(); at the same time, the consumer coordinator's book-kept owned partitions will exclude this partition from its intended behavior, either for revocation / assignment / lost. Note that with the new rebalance protocol, onPartitionsAssigned / onPartitionsRevoked may be called sequentially. If one throws an exception, we would still proceed to complete the rest of the callbacks while remembering the "first-thrown exception", and then at the end throws the remembered exception to the user.

Upon capturing the error, users can do the following depend on their exception handling logic:

  • Shutdown the consumer gracefully if the exception is considered a fatal error.
  • Retry consumer.poll() if they believe the exception is re-triable, be aware that it will be treated as if no side-effects are taken at all from the exception-thrown callback: e.g. if your rebalance listener did some side-effect and then throws, it will still be considered as no-op if consumer.poll() is retriedin other words, the callback is used more like a "notification" to users about what gets assigned and what gets revoked, and throwing exception would not cause any of these results to be changed.


To give a concrete example, suppose your current assigned partitions are {1,2}, and the newly assigned partitions are {2,3}, the consumer will call onPartitionsAssigned(3) and then onPartitionsRevoked(1). Suppose the former succeeds but the latter failed with an error and user captured it in consumer.poll and retry, we just let the consumer to proceed as assign with exception, ConsumerCoordinator would still proceed to complete the latter callback (and assume the latter callback succeeds), and if users decide to retry, it is still considered as successfully changed to {1, 2, 3} since the 3 is added successfully but 1 is revoked unsuccessfully – } – i.e. we consider none all of the effects take place user indicated in the exception-thrown onPartitionsRevoked(1)callback have taken place.


Consumer StickyAssignor

Since we've already encoded the assigned partitions at the consumer protocol layer, for consumer's sticky partitioner we are effectively duplicating this data at both consumer protocol and assignor's user data. Similarly we have a StreamsPartitionAssignor which is sticky as well but relying on its own user data to do it. We can bump up their versions while simplifying the user-data and leverage on the Subscription#ownedPartitions instead (details about the upgrade compatibility below).

...


From the user's perspective, the upgrade path of leveraging new protocols is just the same as switching to a new assignor. For example, assuming the current version of Kafka consumer is 2.2 and "range" assignor is specified in the config. The upgrade path would be:

...

  • Having a first rolling bounce to replace the byte code (i.e. swap the jars); set the assignors to "range, sticky". At this stage, the new versioned byte code will still choose EAGER as the protocol and then sends both assignors in their join-group request, since there are at least one member who's not bounced yet and therefor will only send with "range", "range" assignor will be selected to assign partitions while everyone is following the EAGER protocol. This rolling bounce is safe.
  • Having a second rolling bounce to remove the "range" assignor, i.e. only leave the "sticky" assignor in the config. At this stage, whoever have been bounced will then choose COOPERATIVE protocol and not revoke partitions while others not-yet-bounced will still go with EAGER and revoke everything. However the "sticky" assignor will be chosen since at least one member who's already bounced will not have "range" any more. The "sticky" assignor works even when there are some members in EAGER and some members in COOPERATIVE: it is fine as long as the leader can recognize them and make assignment choice accordingly, and for EAGER members, they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, hence it is safe just to move those partitions to other members immediately based on the assignor's output.
  • The key point behind this two rolling bounce is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce.

    ...