Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. If the stream application is already on static membership, we could call directly from adminClient.removeMembersFromGroup
  2. If the application is on dynamic membership, we should modify adminClient.removeMembersFromGroup interface to allow deletion based on member.id.

Proposed Changes && Public Interfaces

KIP-569 plan to support the ability to  force removing members in `StreamsResetter`, this basically involves 2 public interfaces changes as below:


Public Interfaces

Client side changes:

...

Code Block
title
org.apache.kafka.clients.admin.

...

Changes includes:

...

  • private String memberId;

...

MemberToRemove

...

...

public

...

Previously the org.apache.kafka.clients.admin.KafkaAdminClient#removeMembersFromConsumerGroup only support to remove members by specifying the groupInstanceId, which means it only support remove static members, so the memberId is added to support removing dynamic members .

...

 class MemberToRemove {

...



    private String memberId;

...

 

   

...

 

...

public MemberToRemove() {

...

}

    public MemberToRemove setGroupInstanceId(String groupInstanceId) {

...

}

    

...

public 

...

MemberToRemove 

...

setMemberId(

...

String 

...

memberId) {

...

2) Add cmdline arg force-remove-member to StreamsResetter

The current StreamsResetter will throw exception when found active members:

Code Block
bin/kafka-streams-application-reset.sh --application-id streams-wordcount --input-topics streams-plaintext-input 

While with the new arg, the StreamsResetter will force removing all active members :

...

}
}


CmdLine API change:

Code Block
titlekafka.tools.StreamsResetter
forceOption = optionParser.accepts("force","Force remove members when long session time out has been configured, please make sure to shutdown all stream applications when this option is specified to avoid unexpected rebalances")


Proposed Changes

KIP-571 plan to support the ability to force removing members in StreamsResetter, this basically involves 2 public interfaces changes as below:

1) Changes in org.apache.kafka.clients.admin.MemberToRemove 

Previously the org.apache.kafka.clients.admin.KafkaAdminClient#removeMembersFromConsumerGroup only support to remove members by specifying the groupInstanceId, which means it only support remove static members, so the memberId is added to support removing dynamic members . Since there are two String fields now, new constructor is added and the old constructor will be deprecated.


2) Add cmdline option --force to StreamsResetter

While with the new option, the StreamsResetter will force removing all active members fetched by KafkaAdminClient#describeConsumerGroups, and all the members info will be logged out if all removals succeed. Otherwise,  exception will be thrown. Further more, users should make sure all the stream applications should be shutdown when run StreamsResetter with --force, otherwise it might trigger unexpected rebalance. 


Server side needs no change, GroupCoordinator#handleLeaveGroup can handle both dynamic and static member removals since 2.3.0.

Compatibility, Deprecation, and Migration Plan

  1. Because no classes/method will be removed but only deprecated, this change will be fully backward compatible
  2. The new added cmdline arg forceoption -remove-member force is also backward compatible because it's new feature, if not specified, the StreamsRestterStreamsResetter's behaviour remains unchanged.

Requirement to use this feature:


Release version requirementExpected behavior if version is too old
broker side >= 2.3org.apache.kafka.common.KafkaException: Unknown group metadata version 3
client sideupdate to this KIPTBD


Rejected Alternatives

None

...