...
group.instance.id | The unique identifier of the consumer instance provided by end user. If set to non-empty null string, the consumer is treated as a static member, otherwise an empty null id indicates a dynamic member. Default value: empty null string. |
Client Side Changes
The new `group.instance.id` config will be added to the joinJoin/syncSync/heartbeatHeartbeat/offsetCommit OffsetCommit request/responses.
A list of tuples containing `group.instance.id` and `member.id` will be added to the LeaveGroupRequest, while removing the single `member.id` field.
Code Block |
---|
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId GroupInstanceId ProtocolType GroupProtocols GroupId => String SessionTimeout => int32 RebalanceTimeout => int32 MemberId => String GroupInstanceId => String // new ProtocolType => String GroupProtocols => [Protocol MemberMetadata] Protocol => String MemberMetadata => bytes JoinGroupResponse => ThrottleTime ErrorCode GenerationId ProtocolName LeaderId MemberId Members ThrottleTime => int16 ErrorCode => int16 GenerationId => int32 ProtocolName => String LeaderId => String MemberId => String Members => []JoinGroupResponseMember MemberId => String GroupInstanceId => String // new Metadata => bytes SyncGroupRequest => GroupId GenerationId MemberId GroupInstanceId Assignments GroupId => String GenerationId => int32 MemberId => String GroupInstanceId => String // new Assignments => []SyncGroupRequestAssignment MemberId => String GroupInstanceId => String // new Assignment => bytes SyncGroupResponse => ThrottleTime ErrorCode Assignment ThrottleTime => int16 ErrorCode => int16 Assignment => bytes HeartbeatRequest => GroupId GenerationId MemberId GroupInstanceId GroupId => String GenerationId => int32 MemberId => String GroupInstanceId => String // new HeartbeatResponse => ThrottleTime ErrorCode Assignment ThrottleTime => int16 ErrorCode => int16 OffsetCommitRequest => GroupId GenerationId MemberId GroupInstanceId Topics GroupId => String GenerationId => int32 MemberId => String GroupInstanceId => String // new Topics => []OffsetCommitRequestTopic Name => String Partitions => []OffsetCommitRequestPartition PartitionIndex => int32 CommittedOffset => int64 CommittedLeaderEpoch => int32 CommitTimestamp => int64 CommittedMetadata => String OffsetCommitResponse => ThrottleTime Topics ThrottleTime => int16 Topics => []OffsetCommitResponseTopic Name => String Partitions => []OffsetCommitResponsePartition PartitionIndex => int32 ErrorCode => int16 LeaveGroupRequest => GroupId MemberIdentityList GroupId => String MemberId => String // removed MemberIdentityList => []MemberIdentity // new MemberId => String GroupInstanceId => String |
In the meantime, for better visibility for static members, we bump the join/leave group are also going to bump DescribeGroup request/response version to v4/v3.protocol to include `group.instance.id`:
Code Block | ||||
---|---|---|---|---|
| ||||
public static Schema[] schemaVersions() { DescribeGroupRequest => ThrottleTime Groups ThrottleTime return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2, JOIN_GROUP_REQUEST_V3, JOIN_GROUP_REQUEST_V4}; } | ||||
Code Block | ||||
| ||||
public static Schema[] schemaVersions() { => int16 Groups return new Schema[] {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1, LEAVE_GROUP_REQUEST_V2, LEAVE_GROUP_REQUEST_V3}; } | ||||
Code Block | ||||
| ||||
public static Schema[] schemaVersions() { => []DescribeGroups ErrorCode return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2, JOIN_GROUP_RESPONSE_V3, JOIN_GROUP_RESPONSE_V4}; } |
Code Block | ||
---|---|---|
| ||
public static Schema[] schemaVersions() {
return new Schema[] {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1, LEAVE_GROUP_RESPONSE_V2, LEAVE_GROUP_RESPONSE_V3};
} |
=> int16
GroupId => String
GroupState => String
ProtocolType => String
ProtocolData => int16
Members => []DescribedGroupMember
MemberId => String
GroupInstanceId => String // new
ClientId => String
ClientHost => String
MemberMetadata => bytes
MemberAssignment => bytes |
Of course, we would bump the Join/Sync/Heartbeat/OffsetCommit/Leave/Describe group request/response versions by 1.
We shall use new JoinGroupResponseMember struct to replace the current subscription struct.
Code Block | ||||
---|---|---|---|---|
| ||||
Map<String, ByteBuffer> allSubscriptions -> List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions; |
We shall also add a new public function to `Subscription` class in `PartitionAssignor` to get `group.instance.id`:
Code Block | ||||
---|---|---|---|---|
| ||||
class Subscription {
...
public Optional<String> groupInstanceId();
} |
Similar to the MemberDescription interface (for describe group):We shall use new JoinGroupResponseMember struct to replace the current subscription struct.
Code Block | ||||
---|---|---|---|---|
| ||||
Map<String,class ByteBuffer> allSubscriptions -> List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions;Subscription { ... public Optional<String> groupInstanceId(); } |
We are also introducing a new error type. Will explain the handling in the following section.
...
Code Block | ||
---|---|---|
| ||
public static MembershipChangeResult removeMemberFromGroupremoveMemberFromConsumerGroup(String groupId, list<String>RemoveMemberFromConsumerGroupOptions groupInstanceIdsToRemove, RemoveMemberFromGroupOptions options); |
And a separate option class:
Code Block | ||
---|---|---|
| ||
public class RemoveMemberFromGroupOptions extends AbstractOptions<RemoveMemberFromGroupOptions> { ... private List<MemberIdentity> members; // members to be removed } |
which will use the latest LeaveGroupRequest API to inform broker the permanent leaving of a bunch of consumer instances.
...
- Latest JoinGroupReq/Res and LeaveGroupReq/Res are supported on both client and broker.
- `group.instance.id` is configured with non-empty null string.
Client Behavior Changes
On client side, we add a new config called `group.instance.id` in ConsumerConfig and DistributedConfig (config class for Connect). On consumer service init, if the `group.instance.id` config is set, we will put it in the initial join group request to identify itself as a static member. Note that it is user's responsibility to assign unique `group.instance.id` for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate `group.instance.id` in case client configuration contains duplicates.
...
- If the `member.id` uses UNKNOWN_MEMBER_ID,
- if `group.instance.id` was found on the static map, we shall generate a member.id to reply to the member rejoin request immediately when the group is doing stable. This is to guard against duplicate consumers joining with same `group.instance.id`. We also expect that after KIP-394, all the join group requests are requiring `member.id` to physically enter the consumer group, so the behavior of static member is consistent with that proposal.
- Following the above definition, it would never be possible for static members to receive a MEMBER_ID_REQUIRED exception, nor being put in pending member map.
- if not found, we shall generate a new member id and add the new key-value pair into static member map.
- we are requiring member.id (if not unknown) to match the value stored in cache, otherwise reply FENCED_INSTANCE_ID. The edge case is that if we could have members with the same `group.instance.id` (for example mis-configured instances with a valid `member.id` but added a used `group.instance.id` on runtime). When `group.instance.id` has duplicates, we could refuse join request from members with an outdated `member.id`, since we update the mapping upon each join group request. In an edge case where the client hits this exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce bug cases easily. The exception could be thrown on any client functions depending on Join/Sync/Heartbeat/OffsetCommit request/response.
For join group requests under dynamic membership (without `group.instance.id` set), the handling logic will remain unchanged. If the broker version is not the latest (< v4), the join group request shall be downgraded to v3.
...
- For static member, `group.instance.id` must be provided. Client could optionally provide a `member.id` when `group.instance.id` is configured non-emptynull. If `member.id` is provided, the member will only be removed if the `member.id` matches. Otherwise, only the `group.instance.id` is used. The `member.id` serves as a validation here, which currently will not be used (set to empty string) but potentially useful if we do fully automated removal process.
- For leave group requests under dynamic membership, the member will apply a singleton list of one tuple containing a `member.id` that it is currently using, and a `group.instance.id` which is set to empty null string. If this is the case, we shall just remove the given dynamic member the same way as current leave group logic.
- Error cases expected are:
- Some instance ids (non-emptynull) are not found, which means the request is not valid (UNKNOWN_MEMBER_ID)
- A theoretical case would be that both `member.id` and `group.instance.id` are set to empty string. We shall expose error in the server log. If the entire batch request is configured with empty strings, UNKNOWN_MEMBER_ID error will be returned.
...
A corner case is that although we don't allow static member to send LeaveGroupRequest, the broker could still see such a scenario where the LeaveGroupRequest `member.id` points to an existing static member. The straightforward solution would be removing the member metadata all together including the static member info if the `group.instance.id` was left empty null corresponding. This approach ensures that downgrade process has no negative impact on the normal consumer operation, and avoids complicating the server side logic. In the long term, there could be potential use case to require static member to send LeaveGroupRequest, so we want to avoid changing the handling logic later.
...