Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The new `group.instance.id` config will be added to the joinJoin/syncSync/heartbeatHeartbeat/offsetCommit OffsetCommit request/responses. 

A list of tuples containing `group.instance.id` and `member.id` will be added to the LeaveGroupRequest, while removing the single `member.id` field.

Code Block
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId GroupInstanceId ProtocolType GroupProtocols
  GroupId             => String
  SessionTimeout      => int32
  RebalanceTimeout	  => int32
  MemberId            => String
  GroupInstanceId     => String // new
  ProtocolType        => String
  GroupProtocols      => [Protocol MemberMetadata]
  Protocol            => String
  MemberMetadata      => bytes

JoinGroupResponse => ThrottleTime ErrorCode GenerationId ProtocolName LeaderId MemberId Members
  ThrottleTime 		  => int16
  ErrorCode			  => int16
  GenerationId		  => int32
  ProtocolName		  => String
  LeaderId			  => String
  MemberId			  => String
  Members	     	  => []JoinGroupResponseMember
						   MemberId 		=> String
						   GroupInstanceId  => String // new	
						   Metadata			=> bytes

SyncGroupRequest => GroupId GenerationId MemberId GroupInstanceId Assignments
  GroupId             => String
  GenerationId        => int32
  MemberId            => String
  GroupInstanceId     => String // new
  Assignments         => []SyncGroupRequestAssignment
				           MemberId 		=> String
						   GroupInstanceId  => String // new	
						   Assignment	    => bytes

SyncGroupResponse => ThrottleTime ErrorCode Assignment
  ThrottleTime 		  => int16
  ErrorCode			  => int16
  Assignment		  => bytes

HeartbeatRequest => GroupId GenerationId MemberId GroupInstanceId
  GroupId             => String
  GenerationId        => int32
  MemberId            => String
  GroupInstanceId     => String // new

HeartbeatResponse => ThrottleTime ErrorCode Assignment
  ThrottleTime 		  => int16
  ErrorCode			  => int16

OffsetCommitRequest => GroupId GenerationId MemberId GroupInstanceId Topics
  GroupId             => String
  GenerationId        => int32
  MemberId            => String
  GroupInstanceId     => String // new
  Topics	          => []OffsetCommitRequestTopic
						   Name         => String
						   Partitions   => []OffsetCommitRequestPartition
											 PartitionIndex        => int32	
											 CommittedOffset       => int64	
											 CommittedLeaderEpoch  => int32	
											 CommitTimestamp       => int64
											 CommittedMetadata	   => String

OffsetCommitResponse => ThrottleTime Topics
  ThrottleTime 		  => int16
  Topics	          => []OffsetCommitResponseTopic
						   Name         => String
						   Partitions   => []OffsetCommitResponsePartition
											 PartitionIndex        => int32	
											 ErrorCode             => int16

LeaveGroupRequest => GroupId MemberIdentityList
  GroupId             => String
  MemberId            => String // removed
  MemberIdentityList  => []MemberIdentity // new
						   MemberId         => String
						   GroupInstanceId  => String		

In the meantime, we bump the join/leave group request/response version to v4/v3.

Code Block
languagejava
titleJoinGroupRequest.java
public static Schema[] schemaVersions() {
    return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2, JOIN_GROUP_REQUEST_V3, JOIN_GROUP_REQUEST_V4};
}
Code Block
languagejava
titleLeaveGroupRequest.java
public static Schema[] schemaVersions() {
    return new Schema[] {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1, LEAVE_GROUP_REQUEST_V2, LEAVE_GROUP_REQUEST_V3};
}
Code Block
titleJoinGroupResponse.java
public static Schema[] schemaVersions() {
    return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2, JOIN_GROUP_RESPONSE_V3, JOIN_GROUP_RESPONSE_V4};
}
Code Block
titleLeaveGroupResponse.java
public static Schema[] schemaVersions() {
    return new Schema[] {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1, LEAVE_GROUP_RESPONSE_V2, LEAVE_GROUP_RESPONSE_V3};
} => String		

In the meantime, we bump the Join/Sync/Heartbeat/OffsetCommit/Leave group request/response versions by 1.


We shall use new JoinGroupResponseMember struct to replace the current subscription struct.

...

On client side, we add a new config called `group.instance.id` in ConsumerConfig and DistributedConfig (config class for Connect). On consumer service init, if the `group.instance.id` config is set, we will put it in the initial join group request to identify itself as a static member. Note that it is user's responsibility to assign unique `group.instance.id` for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate `group.instance.id` in case client configuration contains duplicates.

...

  • If the `member.id` uses UNKNOWN_MEMBER_ID,
    • if `group.instance.id` was found on the static map, we shall generate a member.id to reply to the member rejoin request immediately when the group is doing stable. This is to guard against duplicate consumers joining with same `group.instance.id`. We also expect that after KIP-394, all the join group requests are requiring `member.id` to physically enter the consumer group, so the behavior of static member is consistent with that proposal.
    • Following the above definition, it would never be possible for static members to receive a MEMBER_ID_REQUIRED exception, nor being put in pending member map.
    • if not found, we shall generate a new member id and add the new key-value pair into static member map. 
  • we are requiring member.id (if not unknown) to match the value stored in cache, otherwise reply FENCED_INSTANCE_ID. The edge case is that if we could have members with the same `group.instance.id` (for example mis-configured instances with a valid `member.id` but added a used `group.instance.id` on runtime). When `group.instance.id` has duplicates, we could refuse join request from members with an outdated `member.id`, since we update the mapping upon each join group request. In an edge case where the client hits this exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce bug cases easily. The exception could be thrown on any client functions depending on Join/Sync/Heartbeat/OffsetCommit request/response.

For join group requests under dynamic membership (without `group.instance.id` set), the handling logic will remain unchanged. If the broker version is not the latest (< v4), the join group request shall be downgraded to v3.

...