...
The new `group.instance.id` config will be added to the joinJoin/syncSync/heartbeatHeartbeat/offsetCommit OffsetCommit request/responses.
A list of tuples containing `group.instance.id` and `member.id` will be added to the LeaveGroupRequest, while removing the single `member.id` field.
Code Block |
---|
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId GroupInstanceId ProtocolType GroupProtocols
GroupId => String
SessionTimeout => int32
RebalanceTimeout => int32
MemberId => String
GroupInstanceId => String // new
ProtocolType => String
GroupProtocols => [Protocol MemberMetadata]
Protocol => String
MemberMetadata => bytes
JoinGroupResponse => ThrottleTime ErrorCode GenerationId ProtocolName LeaderId MemberId Members
ThrottleTime => int16
ErrorCode => int16
GenerationId => int32
ProtocolName => String
LeaderId => String
MemberId => String
Members => []JoinGroupResponseMember
MemberId => String
GroupInstanceId => String // new
Metadata => bytes
SyncGroupRequest => GroupId GenerationId MemberId GroupInstanceId Assignments
GroupId => String
GenerationId => int32
MemberId => String
GroupInstanceId => String // new
Assignments => []SyncGroupRequestAssignment
MemberId => String
GroupInstanceId => String // new
Assignment => bytes
SyncGroupResponse => ThrottleTime ErrorCode Assignment
ThrottleTime => int16
ErrorCode => int16
Assignment => bytes
HeartbeatRequest => GroupId GenerationId MemberId GroupInstanceId
GroupId => String
GenerationId => int32
MemberId => String
GroupInstanceId => String // new
HeartbeatResponse => ThrottleTime ErrorCode Assignment
ThrottleTime => int16
ErrorCode => int16
OffsetCommitRequest => GroupId GenerationId MemberId GroupInstanceId Topics
GroupId => String
GenerationId => int32
MemberId => String
GroupInstanceId => String // new
Topics => []OffsetCommitRequestTopic
Name => String
Partitions => []OffsetCommitRequestPartition
PartitionIndex => int32
CommittedOffset => int64
CommittedLeaderEpoch => int32
CommitTimestamp => int64
CommittedMetadata => String
OffsetCommitResponse => ThrottleTime Topics
ThrottleTime => int16
Topics => []OffsetCommitResponseTopic
Name => String
Partitions => []OffsetCommitResponsePartition
PartitionIndex => int32
ErrorCode => int16
LeaveGroupRequest => GroupId MemberIdentityList
GroupId => String
MemberId => String // removed
MemberIdentityList => []MemberIdentity // new
MemberId => String
GroupInstanceId => String
|
In the meantime, we bump the join/leave group request/response version to v4/v3.
Code Block | ||||
---|---|---|---|---|
| ||||
public static Schema[] schemaVersions() {
return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2, JOIN_GROUP_REQUEST_V3, JOIN_GROUP_REQUEST_V4};
} |
Code Block | ||||
---|---|---|---|---|
| ||||
public static Schema[] schemaVersions() {
return new Schema[] {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1, LEAVE_GROUP_REQUEST_V2, LEAVE_GROUP_REQUEST_V3};
} |
Code Block | ||
---|---|---|
| ||
public static Schema[] schemaVersions() {
return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2, JOIN_GROUP_RESPONSE_V3, JOIN_GROUP_RESPONSE_V4};
} |
Code Block | ||
---|---|---|
| ||
public static Schema[] schemaVersions() { return new Schema[] {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1, LEAVE_GROUP_RESPONSE_V2, LEAVE_GROUP_RESPONSE_V3}; } => String |
In the meantime, we bump the Join/Sync/Heartbeat/OffsetCommit/Leave group request/response versions by 1.
We shall use new JoinGroupResponseMember struct to replace the current subscription struct.
...
On client side, we add a new config called `group.instance.id` in ConsumerConfig and DistributedConfig (config class for Connect). On consumer service init, if the `group.instance.id` config is set, we will put it in the initial join group request to identify itself as a static member. Note that it is user's responsibility to assign unique `group.instance.id` for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate `group.instance.id` in case client configuration contains duplicates.
...
- If the `member.id` uses UNKNOWN_MEMBER_ID,
- if `group.instance.id` was found on the static map, we shall generate a member.id to reply to the member rejoin request immediately when the group is doing stable. This is to guard against duplicate consumers joining with same `group.instance.id`. We also expect that after KIP-394, all the join group requests are requiring `member.id` to physically enter the consumer group, so the behavior of static member is consistent with that proposal.
- Following the above definition, it would never be possible for static members to receive a MEMBER_ID_REQUIRED exception, nor being put in pending member map.
- if not found, we shall generate a new member id and add the new key-value pair into static member map.
- we are requiring member.id (if not unknown) to match the value stored in cache, otherwise reply FENCED_INSTANCE_ID. The edge case is that if we could have members with the same `group.instance.id` (for example mis-configured instances with a valid `member.id` but added a used `group.instance.id` on runtime). When `group.instance.id` has duplicates, we could refuse join request from members with an outdated `member.id`, since we update the mapping upon each join group request. In an edge case where the client hits this exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce bug cases easily. The exception could be thrown on any client functions depending on Join/Sync/Heartbeat/OffsetCommit request/response.
For join group requests under dynamic membership (without `group.instance.id` set), the handling logic will remain unchanged. If the broker version is not the latest (< v4), the join group request shall be downgraded to v3.
...