Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6995

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8224

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

group.instance.id

The unique identifier of the consumer instance provided by end user. If set to non-empty null string,

the consumer is treated as a static member, otherwise an empty null id indicates a dynamic member.

Default value: empty null string.

Client Side Changes

The new `group.instance.id` config will be added to the Join/Sync/Heartbeat/OffsetCommit request/responses. 

join group request. A list of tuples containing `group.instance.id` and `member.id` will be added to the LeaveGroupRequest, while removing the single `member.id` field.

Code Block
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId GroupInstanceId ProtocolType GroupProtocols
  GroupId             => String
  SessionTimeout      => int32
  RebalanceTimeout	  => int32
  MemberId            => String
  GroupInstanceId     => String // new
  ProtocolType        => String
  GroupProtocols      => [Protocol MemberMetadata]
  Protocol            => String
  MemberMetadata      => bytes

JoinGroupResponse => ThrottleTime ErrorCode GenerationId ProtocolName LeaderId MemberId Members
  ThrottleTime 		  => int16
  ErrorCode			  => int16
  GenerationId		  => int32
  ProtocolName		  => String
  LeaderId			  => String
  MemberId			  => String
  Members	     	  => []JoinGroupResponseMember
						   MemberId 		=> String
						   GroupInstanceId  => String // new	
						   Metadata			=> bytes

LeaveGroupRequestSyncGroupRequest => GroupId MemberIdentityListGenerationId MemberId GroupInstanceId Assignments
  GroupId             => String
  GenerationId        => int32
  MemberId            => String
 // GroupInstanceId removed
  MemberIdentityList  => List[Tuple[String, String]] /// new

In the meantime, we bump the join/leave group request/response version to v4/v3.

Code Block
languagejava
titleJoinGroupRequest.java
public static Schema[] schemaVersions() {
  Assignments        return new=> Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2, JOIN_GROUP_REQUEST_V3, JOIN_GROUP_REQUEST_V4};
}
Code Block
languagejava
titleLeaveGroupRequest.java
public static Schema[] schemaVersions() {
    return new Schema[] {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1, LEAVE_GROUP_REQUEST_V2, LEAVE_GROUP_REQUEST_V3};
}
Code Block
titleJoinGroupResponse.java
public static Schema[] schemaVersions() {
    return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2, JOIN_GROUP_RESPONSE_V3, JOIN_GROUP_RESPONSE_V4};
}
Code Block
titleLeaveGroupResponse.java
public static Schema[] schemaVersions() {
    return new Schema[] {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1, LEAVE_GROUP_RESPONSE_V2, LEAVE_GROUP_RESPONSE_V3};
}

We are also introducing a new error type. Will explain the handling in the following section.

Code Block
languagejava
titleErrors.java
MEMBER_ID_MISMATCH(78, "This implies some group.instance.id is already in the consumer group, however the corresponding member.id was not matching the record on coordinator", MemeberIdMisMatchException::new)

Stream Side Change

On Kafka Streams side, we plan to expose the list of `group.instance.id` for easy management. This will be done in KIP-414 to expose main consumer client ids which are equivalent to `group.instance.id`s.

Server Side Changes

We shall increase the cap of session timeout to 30 min for relaxing static membership liveness tracking.

Code Block
languagescala
titleKafkaConfig.scala
val GroupMaxSessionTimeoutMs = 1800000 // 30 min for max cap

For fault-tolerance, we also include `group.instance.id` within the member metadata to backup in the __consumer_offsets topic.

Code Block
languagescala
titleGroupMetadataManager
private val MEMBER_METADATA_V3 = new Schema(
  new Field(MEMBER_ID_KEY, STRING),
  new Field(GROUP_INSTANCE_ID_KEY, STRING), // new
  new Field(CLIENT_ID_KEY, STRING),
  new Field(CLIENT_HOST_KEY, STRING),
  new Field(REBALANCE_TIMEOUT_KEY, INT32),
  new Field(SESSION_TIMEOUT_KEY, INT32),
  new Field(SUBSCRIPTION_KEY, BYTES),
  new Field(ASSIGNMENT_KEY, BYTES))

Command Line API and Scripts

We will define one command line API to help us better manage consumer groups:

SyncGroupRequestAssignment
				           MemberId 		=> String
 						   Assignment	    => bytes

SyncGroupResponse => ThrottleTime ErrorCode Assignment
  ThrottleTime 		  => int16
  ErrorCode			  => int16
  Assignment		  => bytes

HeartbeatRequest => GroupId GenerationId MemberId GroupInstanceId
  GroupId             => String
  GenerationId        => int32
  MemberId            => String
  GroupInstanceId     => String // new

HeartbeatResponse => ThrottleTime ErrorCode Assignment
  ThrottleTime 		  => int16
  ErrorCode			  => int16

OffsetCommitRequest => GroupId GenerationId MemberId GroupInstanceId Topics
  GroupId             => String
  GenerationId        => int32
  MemberId            => String
  GroupInstanceId     => String // new
  Topics	          => []OffsetCommitRequestTopic
						   Name         => String
						   Partitions   => []OffsetCommitRequestPartition
											 PartitionIndex        => int32	
											 CommittedOffset       => int64	
											 CommittedLeaderEpoch  => int32	
											 CommitTimestamp       => int64
											 CommittedMetadata	   => String

OffsetCommitResponse => ThrottleTime Topics
  ThrottleTime 		  => int16
  Topics	          => []OffsetCommitResponseTopic
						   Name         => String
						   Partitions   => []OffsetCommitResponsePartition
											 PartitionIndex        => int32	
											 ErrorCode             => int16

LeaveGroupRequest => GroupId MemberIdentityList
  GroupId             => String
  MemberId            => String // removed
  MemberIdentityList  => []MemberIdentity // new
						   MemberId         => String
						   GroupInstanceId  => String		

In the meantime, for better visibility for static members, we are also going to bump DescribeGroup request/response protocol to include `group.instance.id`:

Code Block
DescribeGroupRequest => ThrottleTime Groups
  ThrottleTime           => int16
  Groups                 => []DescribeGroups
							  ErrorCode        => int16
							  GroupId          => String
							  GroupState       => String
							  ProtocolType     => String
							  ProtocolData     => int16
							  Members          => []DescribedGroupMember
									    			MemberId   => String
										    		GroupInstanceId  => String // new
											    	ClientId         => String							
										    		ClientHost       => String
											    	MemberMetadata   => bytes
    												MemberAssignment => bytes

Of course, we would bump the Join/Sync/Heartbeat/OffsetCommit/Leave/Describe group request/response versions by 1.

We shall use new JoinGroupResponseMember struct to replace the current subscription struct.

Code Block
languagejava
titleConsumerCoordinator.java
Map<String, ByteBuffer> allSubscriptions -> List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions;

We shall also add a new public function to `Subscription` class in `PartitionAssignor` to get `group.instance.id`:

Code Block
languagejava
titlePartitionAssignor.java
class Subscription {
	...
	public Optional<String> groupInstanceId();
}

Similar to the MemberDescription interface (for describe group):

Code Block
languagejava
titleMemberDescription.java
class Subscription {
	...
	public Optional<String> groupInstanceId();
}

We are also introducing a new error type. Will explain the handling in the following section.

Code Block
languagejava
titleErrors.java
FENCED_INSTANCE_ID(78, "This implies some group.instance.id is already in the consumer group, however the corresponding member.id was not matching the record on coordinator", FencedInstanceIdException::new)

Stream Side Change

On Kafka Streams side, we plan to expose the list of `group.instance.id` for easy management. This will be done in KIP-414 to expose main consumer client ids which are equivalent to `group.instance.id`s.

Server Side Changes

We shall increase the cap of session timeout to 30 min for relaxing static membership liveness tracking.

Code Block
languagescala
titleKafkaConfig.scala
val GroupMaxSessionTimeoutMs = 1800000 // 30 min for max cap

For fault-tolerance, we also include `group.instance.id` within the member metadata to backup in the __consumer_offsets topic.

Code Block
languagescala
titleGroupMetadataManager
private val MEMBER_METADATA_V3 = new Schema(
  new Field(MEMBER_ID_KEY, STRING),
  new Field(GROUP_INSTANCE_ID_KEY, STRING), // new
  new Field(CLIENT_ID_KEY, STRING),
  new Field(CLIENT_HOST_KEY, STRING),
  new Field(REBALANCE_TIMEOUT_KEY, INT32),
  new Field(SESSION_TIMEOUT_KEY, INT32),
  new Field(SUBSCRIPTION_KEY, BYTES),
  new Field(ASSIGNMENT_KEY, BYTES))

Command Line API and Scripts

We will define one command line API to help us better manage consumer groups:

Code Block
titleAdminClient.java
public static MembershipChangeResult removeMemberFromConsumerGroup(String groupId, RemoveMemberFromConsumerGroupOptions options
Code Block
titleAdminClient.java
public static MembershipChangeResult removeMemberFromGroup(String groupId, list<String> groupInstanceIdsToRemove, RemoveMemberFromGroupOptions options);

And a separate option class:

Code Block
titleRemoveMemberFromGroupOptions.java
public class RemoveMemberFromGroupOptions extends AbstractOptions<RemoveMemberFromGroupOptions>; {
	...
	private List<MemberIdentity> members; // members to be removed
}

which will use the latest LeaveGroupRequest API to inform broker the permanent leaving of a bunch of consumer instances. 

...

  1. Latest JoinGroupReq/Res and LeaveGroupReq/Res are supported on both client and broker.
  2. `group.instance.id` is configured with non-empty null string.

Client Behavior Changes

On client side, we add a new config called `group.instance.id` in ConsumerConfig and DistributedConfig (config class for Connect). On consumer service init, if the `group.instance.id` config is set, we will put it in the initial join group request to identify itself as a static member. Note that it is user's responsibility to assign unique `group.instance.id` for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate `group.instance.id` in case client configuration contains duplicates.

...

Code Block
Suppose we have three consumers in the group with static instance ids: A, B, and C. 
Assume we have a stable group and the respective memberIds are 1, 2, and 3. 
So inside the group coordinator, we have the following state:
members: {A=1, B=2, C=3}
generation: 5

In fact, the consumer leader of the group is not aware of the instance ids of the members. 
So it sees the membership as:
members: {1, 2, 3}.
generation: 5

Now suppose that A does a rolling restart. After restarting, 
the coordinator will assign a new memberId to A and let it continue using the previous assignment. 
So we now have the following state:
members: {A=4, B=2, C=3}
generation: 5


The leader on the other hand still sees the members in the group as {1, 2, 3} 
because it does not know that member A restarted and was given a new memberId. 
Suppose that eventually something causes the group to rebalance (e.g. maybe a new topic was created).
When the leader attempts its assignment, it will see the members {2, 3, 4}. 

However, appending group.instance.id for join group response provides some benefit 
even for the simple partition assignors. Consider, the default range assignor, for example. 
Basically it works by sorting the members in the group and 
then assigning partition ranges to achieve balance. Suppose we have a partition with 9 partitions. 
If the membership were {1, 2, 3}, then the assignment would be the following:
memberId: 1, assignment: {0, 1, 2}
memberId: 2, assignment: {3, 4, 5}
memberId: 3, assignment: {6, 7, 8}

Now when the membership changes to {2, 3, 4}, then all the assignments change as well:
memberId: 2, assignment: {0, 1, 2}
memberId: 3, assignment: {3, 4, 5}
memberId: 4, assignment: {6, 7, 8}

So basically all of the assignments change even though it's the same static members. 
However, if we could consider the instanceId as the first sort key, 
then we can compute the assignment consistently even across restarts:
instanceId: A, memberId: 1, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}

And after the restart:
instanceId: A, memberId: 4, assignment: {0, 1, 2}
instanceId: B, memberId: 2, assignment: {3, 4, 5}
instanceId: C, memberId: 3, assignment: {6, 7, 8}


The full benefit of static assignment can only be realized 
if the assignor knows the instance ids of the members in the group. 
It shouldn't be necessary to do anything fancy with additional metadata.

...

KStream uses stream thread as consumer unit. For a stream instance configured with `num.threads` = 16, there would be 16 main consumers running on a single instance. If user specifies the client id, the stream consumer client id will be like: User client id + "-StreamThread-" + thread id + "-consumer"If user client id is not set, then we will use process id. Our plan is to reuse the consumer client id to define is to reuse the consumer client id to define `group.instance.id`, so effectively the KStream instance will be able to use static membership if end user defines unique `client.id` for stream instances.

For easy operation, we define a new field in StreamsMetadata to expose all the `group.instance.id` , so effectively the KStream instance will be able to use static membership if end user defines unique `client.id` for stream instances.

For easy operation, we define a new field in StreamsMetadata to expose all the `group.instance.id` given on each stream instance, so that user could

...

given on each stream instance, so that user could

  1. Use REST API to get list of `group.instance.id` on stream instances user wants to remove
  2. Shutdown targeting stream instances
  3. Use command line API to batch remove offline consumers

****Update 04/25****

We are going to let stream user directly configures `group.instance.id`, for the sake of avoiding surprising triggering of static membership. On per thread basis, we will pass in (user configured group.instance.id) + "-thread-" + thread id to make sure each main consumer uses unique instance id within one Kafka Stream instance.

...

Server Behavior Changes

Join Group Logic Change

...

  • If the `member.id` uses UNKNOWN_MEMBER_ID,
    • if `group.instance.id` was found on the static map, we shall generate a member.id to reply to the member rejoin request immediately when the group is doing stable. This is to guard against duplicate consumers joining with same `group.instance.id`. We also expect that after KIP-394, all the join group requests are requiring `member.id` to physically enter the consumer group, so the behavior of static member is consistent with that proposal.
    • Following the above definition, it would never be possible for static members to receive a MEMBER_ID_REQUIRED exception, nor being put in pending member map.
    • if not found, we shall generate a new member id and add the new key-value pair into static member map. 
  • we are requiring member.id (if not unknown) to match the value stored in cache, otherwise reply MEMBERFENCED_INSTANCE_ID_MISMATCHThe edge case is that if we could have members with the same `group.instance.id` (for example mis-configured instances with a valid `member.id` but added a used `group.instance.id` on runtime). When `group.instance.id` has duplicates, we could refuse join request from members with an outdated `member.id`, since we update the mapping upon each join group request. In an edge case where the client hits this exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce bug cases easilythe error in early stage and reproduce bug cases easily. The exception could be thrown on any client functions depending on Join/Sync/Heartbeat/OffsetCommit request/response.

For join group requests under dynamic membership (without `group.instance.id` set), the handling logic will remain unchanged. If the broker version is not the latest (< v4), the join group request shall be downgraded to v3.

...

  1. For static member, `group.instance.id` must be provided. Client could optionally provide a `member.id` when `group.instance.id` is configured non-emptynull. If `member.id` is provided, the member will only be removed if the `member.id` matches. Otherwise, only the `group.instance.id` is used. The `member.id` serves as a validation here, which currently will not be used (set to empty string) but potentially useful if we do fully automated removal process.
  2. For leave group requests under dynamic membership, the member will apply a singleton list of one tuple containing a `member.id` that it is currently using, and a `group.instance.id` which is set to empty null string. If this is the case, we shall just remove the given dynamic member the same way as current leave group logic.
  3. Error cases expected are:
    1. Some instance ids (non-emptynull) are not found, which means the request is not valid (UNKNOWN_MEMBER_ID)
    2. A theoretical case would be that both `member.id` and `group.instance.id` are set to empty string. We shall expose error in the server log. If the entire batch request is configured with empty strings, UNKNOWN_MEMBER_ID error will be returned.

...

  1. Upgrade your broker to include this KIP.
  2. Upgrade your client to include this KIP.
  3. Set `group.instance.id` to be unique for each consumer(or stream instance) and `session.timeout.ms` to a reasonable number if necessaryFor KStream user set `client.id` for Stream instances should do the work
  4. Rolling bounce the consumer group.

...

A corner case is that although we don't allow static member to send LeaveGroupRequest, the broker could still see such a scenario where the LeaveGroupRequest `member.id` points to an existing static member. The straightforward solution would be removing the member metadata all together including the static member info if the `group.instance.id` was left empty null corresponding. This approach ensures that downgrade process has no negative impact on the normal consumer operation, and avoids complicating the server side logic. In the long term, there could be potential use case to require static member to send LeaveGroupRequest, so we want to avoid changing the handling logic later.

...

  1. Pre-registration (proposed by Jason). Client user could provide a list of hard-coded `group.instance.id` so that the server could respond to scaling operations more intelligently. For example when we scale up the fleet by defining 4 new client instance ids, the server shall wait until all 4 new members to join the group before kicking out the rebalance, same with scale down.
  2. Add hot standby hosts by defining `target.group.size` (proposed by Mayuresh). We shall keep some idle consumers within the group and when one of the active member go offline, we shall trigger hot swap due to the fact that current group size is smaller than `target.group.size`. With this change we might even not need to extend the session timeout since we should easily use the spare consumer to start working. to extend the session timeout since we should easily use the spare consumer to start working. 
  3. Add a script called kafka-remove-member-from-group.sh for end user to easily manipulate the consumer group. (proposed by Boyang) ./bin/Add a script called kafka-remove-member-from-group.sh for end user to easily manipulate the consumer group. (proposed by Boyang) ./bin/kafka-remove-member-from-group.sh --zookeeper localhost:2181 --broker 1 --group-id  group-1  --group-instance-ids id_1,id_2 (comma separated id list)  will immediately trigger a consumer group rebalance by transiting group state to PREPARE_REBALANCE, while removing all the static members in the given listsh --zookeeper localhost:2181 --broker 1 --group-id  group-1  --group-instance-ids id_1,id_2 (comma separated id list)  will immediately trigger a consumer group rebalance by transiting group state to PREPARE_REBALANCE, while removing all the static members in the given list

  4. Leverage `group.instance.id` for better generic sticky assignment (proposed by Jason). As we have discussed on the client side changes, for assignments relying on the natural order of `member.id`s (range/round robin/hash), the group.instance.id is preferred indicator than member.id because they persist through rolling bounce. Leader will choose to use `group.instance.id` over `member.id` if static membership is enabled