Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state:  Under DiscussionAdopted for 2.6

Vote thread[Vote] KIP-571: Add option to force remove members in StreamsResetter

Discussion thread: [Discuss] KIP-571: Add option to force remove members in StreamsResetter

...

Sometimes people want to reset the stream application sooner , but blocked by the left-over members inside the group coordinator, which only expire expires after session timeout. When the user configures a really long session timeout, it could prevent the group from clearing. We should consider adding the support to cleanup clean up members by forcing them to leave the group. To do that

...

, we could enhance KafkaAdminClient#removeMembersFromConsumerGroup to support remove all members(static&dynamic) in a certain group.

Public Interfaces

Client side changes:

kafkaAdminClient related changes:

Code Block
titleorg.apache.kafka.clients.admin.MemberToRemoveRemoveMembersFromConsumerGroupOptions
public class MemberToRemoveRemoveMembersFromConsumerGroupOptions {

	// newly   private String memberId; 

    public MemberToRemoveadded method
	public boolean removeAll() {}

	// newly   public MemberToRemove setGroupInstanceId(String groupInstanceId) {}

    public MemberToRemove setMemberId(String memberIdadded constructor
	public RemoveMembersFromConsumerGroupOptions() {}
}


CmdLine API change:

Code Block
titlekafka.tools.StreamsResetter
forceOption = optionParser.accepts("force", "Force remove members when long session time out has been configured, please make sure to shutdownshut down all stream applications when this option is specified to avoid unexpected rebalances")

...

KIP-571 plan to support the ability to force delete remove members in StreamsResetter, this basically involves 2 public interfaces changes as below:

1) Changes in org.apache.kafka.clients.admin.MemberToRemove Previously the org.apache.kafka.clients.admin.KafkaAdminClient#removeMembersFromConsumerGroup only support to remove members by specifying the groupInstanceId, which means it only support removal of static members, so the memberId is added to support removing dynamic members . Since there are two String fields now, new helpers are added and the old constructor will be deprecatedrelated to kafkaAdminclient

The old constructor RemoveMembersFromConsumerGroupOptions(Collection<MemberToRemove>members) will imply the non removeAll or remove-specific members scenario and it will throw IllegalArgumentException if empty members is provided.

When the newly added constructor RemoveMembersFromConsumerGroupOptions() is used, it implies the removeAll scenario, underlyingly it set field members to an empty set and removeAll() will return true, in this case, KafkaAdminClient#removeMembersFromConsumerGroup will remove all members in the given group, it will first query the members of the given group and then issue a LeaveGroupRequest with all members specified. Accordingly, the RemoveMembersFromConsumerGroupResult also adds a new private method removeAll() to imply different handling in RemoveMembersFromConsumerGroupResult#all,memberResult. .Under the removeAll scenario, RemoveMembersFromConsumerGroupResult#memberInfos should be empty, the memberResult() is not applicable, the all() should check if RemoveMembersFromConsumerGroupResult#future 100% succeed, if not, it will throw the first exception captured.

2) Add cmdline option --force to StreamsResetter

While with the new option, the StreamsResetter will force removing remove all active members fetched by KafkaAdminClient#describeConsumerGroups, and all the members by calling KafkaAdminClient#removeMembersFromConsumerGroup with removeAll specified. All the deleted members' info will be logged out if all removals succeed. Otherwise,  exception the first member removal error will be thrown. Further more, users should make sure all the stream applications should be shutdown when run StreamsResetter with --force, otherwise it might trigger unexpected rebalance.  

The detailed member removal error cases could be found in KIP-345, which introduced static membership and batch removal on the broker side.

Broker side logic has Server side needs no change, GroupCoordinator#handleLeaveGroup can handle both dynamic and static member removals since 2.3.0.4.

Operational steps of the StreamsResetter:

  1. Stop all the consumers (of streams instances), and wait until the shutdown is complete, otherwise, unexpected rebalance will be triggered.
  2. Use StreamsResetter with --force in case the stopped consumers are still registered at the broker side, without --force, it will fail with an exception suggesting using --force. 

Compatibility, Deprecation, and Migration Plan

  1. Because no classes/method will be removed but only deprecated, this change 1) will be backward compatible
  2. The new added cmdline option --force is also backward compatible because it's a new feature, if not specified, the StreamsResetter's behaviour behavior remains unchanged.

Requirement The requirement to use this feature:


Release version requirementExpected
behavior
error if requirement not satisfied
broker side >= 2.
3org.apache.kafka.common.KafkaException: Unknown group metadata version 3
4

UnsupportedVersionException will be thrown because the batch removal feature was introduced since version 2.4

client sideupdate to this KIP

"force is not a recognized option" will be in the client side log


Rejected Alternatives

None

...