Status
Current state: Under Discussion
Discussion thread: [Discuss] KIP-571: Add option to force remove members in StreamsResetter
JIRA: KAFKA-9146
Motivation
Sometimes people want to reset the stream application sooner, but blocked by the left-over members inside group coordinator, which only expire after session timeout. When user configures a really long session timeout, it could prevent the group from clearing. We should consider adding the support to cleanup members by forcing them to leave the group. To do that,
- If the stream application is already on static membership, we could call directly from adminClient.removeMembersFromGroup
If the application is on dynamic membership, we should modify adminClient.removeMembersFromGroup interface to allow deletion based on member.id.
Proposed Changes && Public Interfaces
KIP-569 plan to support the ability to force removing members in `StreamsResetter`, this basically involves 2 public interfaces changes as below:
1) Changes in org.apache.kafka.clients.admin.MemberToRemove
Changes includes:
- New field:
- private String memberId;
- New methods:
- public MemberToRemove() {}
- public MemberToRemove setGroupInstanceId(String groupInstanceId) {}
- public MemberToRemove setMemberId(String memberId) {}
- Deprecated methods:
- public MemberToRemove(String groupInstanceId) {}
Previously the org.apache.kafka.clients.admin.KafkaAdminClient#removeMembersFromConsumerGroup only support to remove members by specifying the groupInstanceId, which means it only support remove static members, so the memberId is added to support removing dynamic members .
public class MemberToRemove {
private String groupInstanceId;
private String memberId;
// new added method
public MemberToRemove() {
this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
this.groupInstanceId = null;
}
/**
* @deprecated use {@link #MemberToRemove()} instead
*/
@Deprecated
public MemberToRemove(String groupInstanceId) {
this.groupInstanceId = groupInstanceId;
this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
}
@Override
public boolean equals(Object o) {
if (o instanceof MemberToRemove) {
MemberToRemove otherMember = (MemberToRemove) o;
return this.groupInstanceId.equals(otherMember.groupInstanceId)
&& this.memberId.equals(otherMember.memberId);
} else {
return false;
}
}
@Override
public int hashCode() {
return Objects.hash(groupInstanceId, memberId);
}
MemberIdentity toMemberIdentity() {
return new MemberIdentity()
.setGroupInstanceId(groupInstanceId)
.setMemberId(memberId);
}
// new added methods...
public MemberToRemove setGroupInstanceId(String groupInstanceId) {
this.groupInstanceId = groupInstanceId;
return this;
}
public MemberToRemove setMemberId(String memberId) {
this.memberId = memberId;
return this;
}
}
2) Add cmdline arg force-remove-member to StreamsResetter
The current StreamsResetter will throw exception when found active members:
bin/kafka-streams-application-reset.sh --application-id streams-wordcount --input-topics streams-plaintext-input
While with the new arg, the StreamsResetter will force removing all active members :
bin/kafka-streams-application-reset.sh --application-id streams-wordcount --input-topics streams-plaintext-input --force-remove-member
Compatibility, Deprecation, and Migration Plan
- Because no classes/method will be removed but only deprecated, this change will be fully backward compatible
- The new added cmdline arg force-remove-member is also backward compatible because it's new feature, if not specified, the StreamsRestter's behaviour remains unchanged.
Rejected Alternatives
None