Table of Contents |
---|
Status
Current state: Under Discussion Adopted
Discussion thread: http://markmail.org/message/oeg63goh3ed3qdap
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Released: 0.10.1.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Decoupling the processing timeout: We propose to introduce a separate locally enforced timeout for record processing and a background thread to keep the session active until this timeout expires. The We call this new timeout will be exposed in the as the "process timeout" and expose it in the consumer's configuration as process.timeoutas max.poll.interval.ms. This config sets the maximum delay between client calls to poll()
. When the timeout expires, the consumer will stop sending heartbeats and send an explicit LeaveGroup request. As soon as the consumer resumes processing with another call to poll()
, the consumer will rejoin the group. This is equivalent to the current processing model except that it allows the user to set a higher timeout when processing while also using a lower session timeout for faster crash detection.
...
The question then is where the value for this timeout should come from. Since we give the client as much as process.timeoutas max.poll.interval.ms to handle a batch of records, this is also the maximum time before a consumer can be expected to rejoin the group in the worst case. We therefore propose to set the rebalance timeout in the Java client to the same value configured with processmax.poll.timeoutinterval.ms
. When a rebalance begins, the background thread will continue sending heartbeats. The consumer will not rejoin the group until processing completes and the user calls poll()
. From the coordinator's perspective, the consumer will not be removed from the group until either 1) their session timeout expires without receiving a heartbeat, or 2) the rebalance timeout expires.
...
Public Interfaces
This KIP adds the processmax.poll.timeoutinterval.ms
configuration to the consumer configuration as described above. With the decoupled processing timeout, users will be able to set the session timeout significantly lower to detect process crashes faster (the only reason we've set it to 30 seconds up to now is to give users some initial leeway for processing overhead). To avoid the need for most users to tune these settings manually, we suggest the following default values for the three relevant configurations which affect the poll loop behavior:
session.timeout.ms
: 10sprocessmax.poll.timeoutinterval.ms
: 60s5minmax.poll.records
: 500
We've reduced the default session timeout, but actually increased the amount of time given to consumers for message processing to 60 seconds5 minutes. We've also set a fairly conservative max.poll.records
to give users a more reasonable default batch size to avoid the need for many users to tune it in the first place (the current default is Integer.MAX_VALUE).
...
In short, although this KIP may be incompatible, the impact does not seem significant. Additionally, this change will have no impact on 0.9 and 0.10 consumers ability to work with future versions of Kafka. When receiving version 0 of the JoinGroup request, the coordinator will use the session timeout as the rebalance timeout which preserves the old behavior.
Rejected Alternatives
Test Plan
This KIP will be tested primarily through unit and integration testing. On the client, we need to verify max.poll.interval.ms is enforced correctly, including during rebalances. On the server, we need to verify that the rebalance timeout passed in the JoinGroup is enforced, including the case when two members use conflicting values. Since this KIP bumps the JoinGroup API version, it may also make sense to add a system test which verifies compatibility in groups with consumers using the new version and the old version.
Rejected Alternatives
- Add a separate Add a separate API the user can call to indicate liveness: We considered adding a
heartbeat()
API which the user could use from their own thread in order to keep the consumer alive. This also solves the problem, but it puts the burden of managing that thread (including shutdown coordination) on the user. Although there is some advantage to having a separate API since it allows users to develop their own notion of liveness, we feel must users would simply spawn a thread and callheartbeat()
in a loop. We leave this as a possible extension for the future if users find they need it. - Maybe no need for a rebalance timeout in the group protocol? If we only introduce the background thread for heartbeating, then the session timeout could continue to be used as both the processing timeout and the rebalance timeout. This still addresses the most significant problem that users are seeing, which is the consumer falling out of the group because of long processing times. The background thread will keep the consumer in the group as long as the group is stable. However, if a rebalance begins while the consumer is processing data, then there is still the possibility of the consumer falling out of the group since it may not be able to finish processing and join the group fast enough. This scenario is actually common in practice since users often use a processing model where records are collected in memory prior to being flushed to a remote system in a single batch. In this case, once a rebalance begins, the user must flush the existing batch and then commit offsets.
- Perhaps we don't need processmax.poll.timeoutinterval.ms? We could enable the background thread through an explicit configuration and let it keep the consumer in the group indefinitely. This feels a bit like a step backwards since consumer liveness is actually an important problem which users must face. Additionally, users can get virtually the same behavior by setting the timeout to a very large value as long as they are willing to accept longer rebalances in the worst case. Users who require both short rebalances and indefinite processing
- Move rebalancing to the background thread instead of heartbeats only? In this proposal, we have intentionally left rebalances in the foreground because it greatly simplifies the implementation, and also for compatibility, since users currently expect the rebalance listener to execute from the same thread as the consumer. Alternatively, we could move all coordinator communication to the background thread, even allowing rebalances to complete asynchronously. The apparent advantage of doing so is that it would allow the consumer to finish a rebalance while messages are still being processed, but we're not sure this is desirable since offsets for messages which arrived before the rebalance cannot generally be committed safely after it completes (which usually necessitates reprocessing). The current proposal gives users direct control over this tradeoff. To rebalance faster, users must tune their processing loop to work with smaller chunks of data. To give more time for record processing, users must accept a longer worst-case rebalance time. Finally, this change would basically require a rewrite of a huge piece of the consumer, so we've opted for something more incremental.
- The rebalance timeout could be configured separately from the process timeout: It may make sense to expose the rebalance timeout to the user directly instead of using the process timeout as we've suggested above. This might make sense if users were willing to accept some message reprocessing in order to ensure that rebalances always complete quickly. Unfortunately, the single-threaded model of the consumer means that we would have to move the rebalance completion to the background thread, which we already rejected above (see above). Also, there is no obvious reason why a user would ever want to set a rebalance timeout higher than the process timeout.