THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId GroupInstanceId ProtocolType GroupProtocols GroupId => String SessionTimeout => int32 RebalanceTimeout => int32 MemberId => String GroupInstanceId => String // new ProtocolType => String GroupProtocols => [Protocol MemberMetadata] Protocol => String MemberMetadata => bytes JoinGroupResponse => ThrottleTime ErrorCode GenerationId ProtocolName LeaderId MemberId Members ThrottleTime => int16 ErrorCode => int16 GenerationId => int32 ProtocolName => String LeaderId => String MemberId => String Members => []JoinGroupResponseMember MemberId => String GroupInstanceId => String // new Metadata => bytes SyncGroupRequest => GroupId GenerationId MemberId GroupInstanceId Assignments GroupId => String GenerationId => int32 MemberId => String GroupInstanceId => String // new Assignments => []SyncGroupRequestAssignment MemberId => String Assignment => bytes SyncGroupResponse => ThrottleTime ErrorCode Assignment ThrottleTime => int16 ErrorCode => int16 Assignment => bytes HeartbeatRequest => GroupId GenerationId MemberId GroupInstanceId GroupId => String GenerationId => int32 MemberId => String GroupInstanceId => String // new HeartbeatResponse => ThrottleTime ErrorCode Assignment ThrottleTime => int16 ErrorCode => int16 OffsetCommitRequest => GroupId GenerationId MemberId GroupInstanceId Topics GroupId => String GenerationId => int32 MemberId => String GroupInstanceId => String // new Topics => []OffsetCommitRequestTopic Name => String Partitions => []OffsetCommitRequestPartition PartitionIndex => int32 CommittedOffset => int64 CommittedLeaderEpoch => int32 CommitTimestamp => int64 CommittedMetadata => String OffsetCommitResponse => ThrottleTime Topics ThrottleTime => int16 Topics => []OffsetCommitResponseTopic Name => String Partitions => []OffsetCommitResponsePartition PartitionIndex => int32 ErrorCode => int16 LeaveGroupRequest => GroupId MemberIdentityList GroupId => String MemberId => String // removed MemberIdentityList => []MemberIdentity // new MemberId => String GroupInstanceId => String |
In the meantime, for better visibility for static members, we are also going to bump DescribeGroup request/response protocol to include `group.instance.id`:
Code Block |
---|
DescribeGroupRequest => ThrottleTime Groups
ThrottleTime => int16
Groups => []DescribeGroups
ErrorCode => int16
GroupId => String
GroupState => String
ProtocolType => String
ProtocolData => int16
Members => []DescribedGroupMember
MemberId => String
GroupInstanceId => String // new
ClientId => String
ClientHost => String
MemberMetadata => bytes
MemberAssignment => bytes |
Of course, we would bump the Join/Sync/Heartbeat/OffsetCommit/Leave/Describe group request/response versions by 1.
...
Code Block | ||||
---|---|---|---|---|
| ||||
Map<String, ByteBuffer> allSubscriptions -> List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions; |
We shall also add a new public function to `Subscription` class in `PartitionAssignor` to get `group.instance.id`:
Code Block | ||||
---|---|---|---|---|
| ||||
class Subscription { ... public Optional<String> groupInstanceId(); } |
We are also introducing a new error type. Will explain the handling in the following section.
...