You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 23 Next »

Status

Current state: Voting

Vote thread[Vote] KIP-571: Add option to force remove members in StreamsResetter

Discussion thread: [Discuss] KIP-571: Add option to force remove members in StreamsResetter

JIRA: KAFKA-9146


Motivation

Sometimes people want to reset the stream application sooner but blocked by the left-over members inside the group coordinator, which only expires after session timeout. When user configures a long session timeout, it could prevent the group from clearing. We should consider adding support to clean up members by forcing them to leave the group. To do that, 

  1. If the stream application is already on static membership, we could call directly from adminClient.removeMembersFromGroup
  2. If the application is on dynamic membership, we should modify adminClient.removeMembersFromGroup interface to allow deletion based on member.id.

Public Interfaces

Client side changes:

org.apache.kafka.clients.admin.MemberToRemove
public class MemberToRemove {

	// deprecated methods

    @Deprecated
	private String memberId; 

	// newly added methods

    public MemberToRemove() {}

    public MemberToRemove withGroupInstanceId(String groupInstanceId) {}

    public MemberToRemove withMemberId(String memberId) {}
}

kafkaAdminClient related changes:

RemoveMembersFromConsumerGroupOptions
public class RemoveMembersFromConsumerGroupOptions {
	// newly added field
	private boolean removeAll;

	// newly added constructor
	public RemoveMembersFromConsumerGroupOptions(Collection<MemberToRemove> members, Boolean removeAll) {}
}
RemoveMembersFromConsumerGroupResult
public class RemoveMembersFromConsumerGroupResult {
	// newly added fields
	private final boolean removeAll;
}

CmdLine API change:

kafka.tools.StreamsResetter
forceOption = optionParser.accepts("force", "Force remove members when long session time out has been configured, please make sure to shut down all stream applications when this option is specified to avoid unexpected rebalances")


Proposed Changes

KIP-571 plan to support the ability to force remove members in StreamsResetter, this involves public interfaces changes as below:

1) Changes in org.apache.kafka.clients.admin.MemberToRemove 

Previously the KafkaAdminClient#removeMembersFromConsumerGroup only supports to remove members by specifying the groupInstanceId, which means it only supports the removal of static members, so the memberId is added to support removing dynamic members. Since there are two String fields now, new helpers are added and the old constructor will be deprecated.

2) Changes related to kafkaAdminclient

By adding new field removeAll in RemoveMembersFromConsumerGroupOptions, KafkaAdminClient#removeMembersFromConsumerGroup will support remove all members in the specified group. The implementation of removeMembersFromConsumerGroup should, when removeAll specified, first query the members of the given group then issue a LeaveGroupRequest with all members specified. Accordingly, the RemoveMembersFromConsumerGroupResult also adds a new field removeAll to handle differently for RemoveMembersFromConsumerGroupResult#all, under the removeAll scenario, RemoveMembersFromConsumerGroupResult#memberInfos should be empty, the all() should directly check if RemoveMembersFromConsumerGroupResult#future 100% succeed.

3) Add cmdline option --force to StreamsResetter

While with the new option, the StreamsResetter will force remove all active members by calling KafkaAdminClient#removeMembersFromConsumerGroup with removeAll speicified. All the deleted members' info will be logged out if all removals succeed. Otherwise,  the first member removal error will be thrown. 

The detailed member removal error cases could be found in KIP-345, which introduced static membership and batch removal on the broker side.

Broker side logic has no change, GroupCoordinator#handleLeaveGroup can handle both dynamic and static member removals since 2.4.

Operational steps of the StreamsResetter:

  1. Stop all the consumers (of streams instances), and wait until the shutdown is complete, otherwise, unexpected rebalance will be triggered.
  2. Use StreamsResetter with --force in case the stopped consumers are still registered at the broker side, without --force, it will fail with an exception suggesting using --force. 

Compatibility, Deprecation, and Migration Plan

  1. Because no classes/method will be removed but only deprecated, change 1)&2) will be backward compatible
  2. The new cmdline option --force is also backward compatible because it's a new feature, if not specified, the StreamsResetter's behavior remains unchanged.

The requirement to use this feature:


Release version requirementExpected error if requirement not satisfied
broker side >= 2.4

UnsupportedVersionException will be thrown because the batch removal feature was introduced since version 2.4

client sideupdate to this KIP

"force is not a recognized option" will be in the client side log


Rejected Alternatives

None


  • No labels