Table of Contents |
---|
Status
Current state: Under DiscussionAdopted (3.3.0)
Discussion thread: here
Vote thread: hereDiscussion thread: WIP
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
(Sophie Blee-Goldman wrote most of this in the issue description. And as that is rich enough, I just copied parts of that)
Context
In Kafka Streams, when an instance is shut down via the close() API, we intentionally skip sending a LeaveGroup request. We decided to do so because the shutdown is often not due to a scaling-down event but some temporary closure, such as during a rolling bounce. In cases where the instance is expected to start up again shortly after, we originally wanted to avoid that member's tasks from being redistributed across the remaining group members since this would disturb the stable assignment and could cause unnecessary state migration and restoration. We also hoped to limit the disruption to just a single rebalance, rather than forcing the group to rebalance once the member shuts down and then again when it comes back up. So it's an optimization for the case in which the shutdown is temporary.
Problem
Above optimization makes sense for the cases of temporary closure.
...
This situation is more critical given the recent increase in default `session.timeout` to 45s, since that's a long time to go without noticing that a consumer has indeed permanently left the group.
Public Interfaces
...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams;
public class KafkaStreams implements AutoCloseable {
public void close() // Already exist
private boolean close(final long timeoutMs) // Already exist
public synchronized boolean close(final Duration timeout) throws IllegalArgumentException // Already exist
public synchronized boolean close(CloseOptions options) throws IllegalArgumentException // *This one will be added
public static class CloseOptions {
private Duration timeout = Duration.ofMillis(Long.MAX_VALUE);
private boolean leaveGroup = false;
public CloseOptions timeout(Duration timeout)
public CloseOptions leaveGroup(boolean leaveGroup)
}
} |
Proposed Changes
We introduce another form of the `KafkaStreams.close()` method that forces the member to leave the consumer group, to be used in event of actual scale down rather than a transient bounce.
Compatibility, Deprecation, and Migration Plan
The proposal is backward-compatible because it only adds new method and does not change any existing methods.
This would be considered as an optimization for some cases of `KafkaStreams` instance closure. And there will be no impact till users start using this new method to optimize their applications.
Rejected Alternatives
There was another option to achieve the same purpose: letting a member leave the consumer group in every case, including quick bounce. With this approach, there will be unexpected side effects which guozhang Wang figures out. In short, if we make the bouncing member leave the consumer group, we have to move the tasks on that shutdown instance to others immediately, which would start restoring the states. So it's still valuable to let the bouncing member temporarily be closed without leaving the group so that we can rebalance the tasks only after the instance comes back and get the tasks back to the restarted instances and hence no task migration.
...