Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: VotingAdopted for 2.6

Vote thread[Vote] KIP-571: Add option to force remove members in StreamsResetter

...

Sometimes people want to reset the stream application sooner but blocked by the left-over members inside the group coordinator, which only expires after session timeout. When the user configures a long session timeout, it could prevent the group from clearing. We should consider adding support to clean up members by forcing them to leave the group. To do that, we could enhance KafkaAdminClient#removeMembersFromConsumerGroup to support remove all members(static&dynamic) in a certain group.

...

RemoveMembersFromConsumerGroupResult
Code Block
titleRemoveMembersFromConsumerGroupOptions
public class RemoveMembersFromConsumerGroupOptions {
	// newly added fieldmethod
	privatepublic boolean removeAll;() {}

	// newly added constructor
	public RemoveMembersFromConsumerGroupOptions(Collection<MemberToRemove> members, Boolean removeAll) {}
}
Code Block
title
public class RemoveMembersFromConsumerGroupResult {
	// newly added fields
	private final boolean removeAll;
}


CmdLine API change:

Code Block
titlekafka.tools.StreamsResetter
forceOption = optionParser.accepts("force", "Force remove members when long session time out has been configured, please make sure to shut down all stream applications when this option is specified to avoid unexpected rebalances")

...

1) Changes related to kafkaAdminclient

The old constructor RemoveMembersFromConsumerGroupOptions(Collection<MemberToRemove>members) will imply the non removeAll or remove-specific members scenario and it will throw IllegalArgumentException if empty members is provided.

When the newly added constructor RemoveMembersFromConsumerGroupOptions() is used, it implies the removeAll scenario, underlyingly it set field members to an empty set and removeAll() will return true, in this case, KafkaAdminClient#removeMembersFromConsumerGroup will By adding new field removeAll in RemoveMembersFromConsumerGroupOptions, KafkaAdminClient#removeMembersFromConsumerGroup will support remove all members in the specified given group. The implementation of removeMembersFromConsumerGroup should, when removeAll specified, first , it will first query the members of the given group and then issue a LeaveGroupRequest with all members specified. Accordingly, the RemoveMembersFromConsumerGroupResult also adds a new field private method removeAllto handle differently for RemoveMembersFromConsumerGroupResult#all, under () to imply different handling in RemoveMembersFromConsumerGroupResult#all,memberResult. .Under the removeAll scenario, RemoveMembersFromConsumerGroupResult#memberInfos should be empty, the memberResult() is not applicable, the all() should directly check if RemoveMembersFromConsumerGroupResult#future 100% succeed, if not, it will throw the first exception captured.

2) Add cmdline option --force to StreamsResetter

While with the new option, the StreamsResetter will force remove all active members by calling KafkaAdminClient#removeMembersFromConsumerGroup with removeAll speicified specified. All the deleted members' info will be logged out if all removals succeed. Otherwise,  the first member removal error will be thrown. 

...