...
Code Block |
---|
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId GroupInstanceId ProtocolType GroupProtocols
GroupId => String
SessionTimeout => int32
RebalanceTimeout => int32
MemberId => String
GroupInstanceId => String // new
ProtocolType => String
GroupProtocols => [Protocol MemberMetadata]
Protocol => String
MemberMetadata => bytes
JoinGroupResponse => ThrottleTime ErrorCode GenerationId ProtocolName LeaderId MemberId Members
ThrottleTime => int16
ErrorCode => int16
GenerationId => int32
ProtocolName => String
LeaderId => String
MemberId => String
Members => []JoinGroupResponseMember
MemberId => String
GroupInstanceId => String // new
Metadata => bytes
SyncGroupRequest => GroupId GenerationId MemberId GroupInstanceId Assignments
GroupId => String
GenerationId => int32
MemberId => String
GroupInstanceId => String // new
Assignments => []SyncGroupRequestAssignment
MemberId => String
Assignment => bytes
SyncGroupResponse => ThrottleTime ErrorCode Assignment
ThrottleTime => int16
ErrorCode => int16
Assignment => bytes
HeartbeatRequest => GroupId GenerationId MemberId GroupInstanceId
GroupId => String
GenerationId => int32
MemberId => String
GroupInstanceId => String // new
HeartbeatResponse => ThrottleTime ErrorCode Assignment
ThrottleTime => int16
ErrorCode => int16
OffsetCommitRequest => GroupId GenerationId MemberId GroupInstanceId Topics
GroupId => String
GenerationId => int32
MemberId => String
GroupInstanceId => String // new
Topics => []OffsetCommitRequestTopic
Name => String
Partitions => []OffsetCommitRequestPartition
PartitionIndex => int32
CommittedOffset => int64
CommittedLeaderEpoch => int32
CommitTimestamp => int64
CommittedMetadata => String
OffsetCommitResponse => ThrottleTime Topics
ThrottleTime => int16
Topics => []OffsetCommitResponseTopic
Name => String
Partitions => []OffsetCommitResponsePartition
PartitionIndex => int32
ErrorCode => int16
LeaveGroupRequest => GroupId MemberIdentityList
GroupId => String
MemberId => String // removed
MemberIdentityList => []MemberIdentity // new
MemberId => String
GroupInstanceId => String
|
In the meantime, for better visibility for static members, we bump the Join/Sync/Heartbeat/OffsetCommit/Leave group request/response versions by 1.are also going to bump DescribeGroup request/response protocol to include `group.instance.id`:
Code Block |
---|
DescribeGroupRequest => ThrottleTime Groups
ThrottleTime => int16
Groups => []DescribeGroups
ErrorCode => int16
GroupId => String
GroupState => String
ProtocolType => String
ProtocolData => int16
Members => []DescribedGroupMember
MemberId => String
GroupInstanceId => String // new
ClientId => String
ClientHost => String
MemberMetadata => bytes
MemberAssignment => bytes |
Of course, we would bump the Join/Sync/Heartbeat/OffsetCommit/Leave/Describe group request/response versions by 1.
We shall use new JoinGroupResponseMember struct to replace the current subscription struct.
Code Block |
---|
language | java |
---|
title | ConsumerCoordinator.java |
---|
|
Map<String, ByteBuffer> allSubscriptions -> List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions; |
We shall also add a new public function to `Subscription` class in `PartitionAssignor` to get `group.instance.id`:
Code Block |
---|
language | java |
---|
title | PartitionAssignor.java |
---|
|
class Subscription {
...
public Optional<String> groupInstanceId();
} |
Similar to the MemberDescription interface (for describe group):We shall use new JoinGroupResponseMember struct to replace the current subscription struct.
Code Block |
---|
language | java |
---|
title | ConsumerCoordinatorMemberDescription.java |
---|
|
Map<String,class ByteBuffer> allSubscriptions -> List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions;Subscription {
...
public Optional<String> groupInstanceId();
} |
We are also introducing a new error type. Will explain the handling in the following section.
...
Code Block |
---|
|
public static MembershipChangeResult removeMemberFromGroupremoveMemberFromConsumerGroup(String groupId, list<String> groupInstanceIdsToRemove, RemoveMemberFromGroupOptionsRemoveMemberFromConsumerGroupOptions options); |
And a separate option class:
Code Block |
---|
title | RemoveMemberFromGroupOptions.java |
---|
|
public class RemoveMemberFromGroupOptions extends AbstractOptions<RemoveMemberFromGroupOptions> {
...
private List<MemberIdentity> members; // members to be removed
} |
which will use the latest LeaveGroupRequest API to inform broker the permanent leaving of a bunch of consumer instances.
...