Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add test plan

...

In short, although this KIP may be incompatible, the impact does not seem significant. Additionally, this change will have no impact on 0.9 and 0.10 consumers ability to work with future versions of Kafka. When receiving version 0 of the JoinGroup request, the coordinator will use the session timeout as the rebalance timeout which preserves the old behavior.

Test Plan

This KIP will be tested primarily through unit and integration testing. On the client, we need to verify max.poll.interval.ms is enforced correctly, including during rebalances. On the server, we need to verify that the rebalance timeout passed in the JoinGroup is enforced, including the case when two members use conflicting values. Since this KIP bumps the JoinGroup API version, it may also make sense to add a system test which verifies compatibility in groups with consumers using the new version and the old version.

Rejected Alternatives

  1. Add a separate API the user can call to indicate liveness: We considered adding a heartbeat() API which the user could use from their own thread in order to keep the consumer alive. This also solves the problem, but it puts the burden of managing that thread (including shutdown coordination) on the user. Although there is some advantage to having a separate API since it allows users to develop their own notion of liveness, we feel must users would simply spawn a thread and call heartbeat() in a loop. We leave this as a possible extension for the future if users find they need it.
  2. Maybe no need for a rebalance timeout in the group protocol? If we only introduce the background thread for heartbeating, then the session timeout could continue to be used as both the processing timeout and the rebalance timeout. This still addresses the most significant problem that users are seeing, which is the consumer falling out of the group because of long processing times. The background thread will keep the consumer in the group as long as the group is stable. However, if a rebalance begins while the consumer is processing data, then there is still the possibility of the consumer falling out of the group since it may not be able to finish processing and join the group fast enough. This scenario is actually common in practice since users often use a processing model where records are collected in memory prior to being flushed to a remote system in a single batch. In this case, once a rebalance begins, the user must flush the existing batch and then commit offsets. 
  3. Perhaps we don't need max.poll.interval.ms? We could enable the background thread through an explicit configuration and let it keep the consumer in the group indefinitely. This feels a bit like a step backwards since consumer liveness is actually an important problem which users must face. Additionally, users can get virtually the same behavior by setting the timeout to a very large value as long as they are willing to accept longer rebalances in the worst case. Users who require both short rebalances and indefinite processing 
  4. Move rebalancing to the background thread instead of heartbeats only? In this proposal, we have intentionally left rebalances in the foreground because it greatly simplifies the implementation, and also for compatibility, since users currently expect the rebalance listener to execute from the same thread as the consumer. Alternatively, we could move all coordinator communication to the background thread, even allowing rebalances to complete asynchronously. The apparent advantage of doing so is that it would allow the consumer to finish a rebalance while messages are still being processed, but we're not sure this is desirable since offsets for messages which arrived before the rebalance cannot generally be committed safely after it completes (which usually necessitates reprocessing). The current proposal gives users direct control over this tradeoff. To rebalance faster, users must tune their processing loop to work with smaller chunks of data. To give more time for record processing, users must accept a longer worst-case rebalance time. Finally, this change would basically require a rewrite of a huge piece of the consumer, so we've opted for something more incremental. 
  5. The rebalance timeout could be configured separately from the process timeout: It may make sense to expose the rebalance timeout to the user directly instead of using the process timeout as we've suggested above. This might make sense if users were willing to accept some message reprocessing in order to ensure that rebalances always complete quickly. Unfortunately, the single-threaded model of the consumer means that we would have to move the rebalance completion to the background thread, which we already rejected above (see above). Also, there is no obvious reason why a user would ever want to set a rebalance timeout higher than the process timeout.