Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
JoinGroupRequest => 
GroupId GROUP_ID => STRING
 SESSION_TIMEOUT => INT32
 REBALANCE_TIMEOUT => INT32
 MEMBER_NAME => INT32 // NEW
 PROTOCOL_TYPE => STRING
 GROUP_PROTOCOLS => ARRAY

JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId MemberName ProtocolType GroupProtocols
  GroupId             => String
  SessionTimeout  SessionTimeout RebalanceTimeout MemberId MemberName ProtocolType GroupProtocols
  GroupId             => String
  SessionTimeout      => int32
  RebalanceTimeout	  => int32
  MemberId            => String
  MemberName   		  => String // new
  ProtocolType        => String
  GroupProtocols      => [Protocol MemberMetadata]
  Protocol            => String
  MemberMetadata      => bytes


SyncGroupRequest => GroupId GroupGenerationId MemberId MemberName
  GroupId           => String
  GroupGenerationId => int32
  GroupState        => [MemberId MemberState]
  MemberId          => String
  MemberName   		=> String // new
  MemberState       => bytes
  
OffsetCommitRequest => GroupId GenerationId MemberId MemberName Topics
 GroupId      		=> String
 GenerationId 		=> String
 MemberId    		=> String
 MemberName 		=> String // new
 Topics 			=> partition data

...

so that when member name has duplicates, we could refuse commit request from members with an outdated member.id (since we update the mapping upon each join group request). Normally when hitting NO_STATIC_MEMBER_INFO_SET, it could potentially due to the consumer is doing rolling restart, where some members haven't updated their code with new member name. In an edge case where the client hits DUPLICATE_STATIC_MEMBER exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce corner cases easily.

When do we rebalance in static membership?

...

Rebalance happens rarely in static membership (unless scale up/down or leader rejoins). When receiving an existing member's rejoin request, broker will return the cached assignment back to the member, without doing any rebalance.

There are two configs to control the case when we must do a rebalance: registration timeout and expansion timeout.

Scale down

Code Block
languagescala
titleGroupMetadata.scala
def registrationTimeoutMs = Int // Default 15 min

Scale down

Session timeout is the timeout we will trigger rebalance when Registration timeout is the timeout we will trigger rebalance when a member goes offline for too long . It should usually be set much larger than session timeout which is used to detect consumer health. It is monitored through heartbeat the same way as session timeout, and will replace the session timeout in a static membership. The reason we define a different config is because we would like easy switch between dynamic membership and static membership without adding on mental management burden. By setting it to 15 ~ 30 minutes, we are loosening the track of static member progress, and transfer the member management to client application like K8. Of course, we should not wait forever for the member to back online simply for the purpose of reducing rebalances. Eventually the member will be kicked out of group and a final rebalance is triggered. Note that we are tracking the earliest offline member and compare with the registration timeout. Example below with registration timeout 15 min:

...

Member B dropped

...

There are cases when we are scaling down the application, it is advised to do it quickly so that when the registration timeout is reached since the first gone member, we could trigger one single rebalance and make the progress back on track. Note that here we are sacrificing liveness for x min of registration timeout for the sake of minimizing state shuffling. This is a reasonable trade-off for large state applications.

A corner case is that A & B could be dropping off the group at near time. In static membership, we still need to sync group to make sure we keep track of how many existing members are still alive, otherwise unnecessary rebalance will trigger later. 

Removing members are tricky in nature. For broker the information of the "target scale down" is very hard to get, for example if we have 16 members and we want to cut the number by half, during the group shrink 16 → 8 it is unknown to the broker coordinator when to trigger rebalance. An admin API to force rebalance could be helpful here, but we will make a call once we finished the major implementation.

Scale up

Adding new static memberships should be straightforward. This operation should be happening fast enough (to make sure capacity could catch up quickly), we are defining another config called expansion timeout.

Code Block
titleGroupMetadata.scala
def expansionTimeoutMs = Int // Default 5 min

This is the timeout when we count down to trigger exactly one rebalance (i.e, the time estimate to spin up # of hosts) since the first joined member's request. It is advised to be set roughly the same with session timeout to make sure the workload become balanced when you 2X or 3X your stream job. Example with expansion timeout 5 min: 

...

New member B join

...

(not sending heartbeat request). To make static membership effective, we should increase the default max session timeout to 30 min so that end user could config it freely. Although in static membership we are leveraging client applications' self management (for example K8), session timeout could serve as the last line of defense for liveness.


Code Block
languagescala
titleKafkaConfig.scala
val GroupMaxSessionTimeoutMs = 1800000 // 30 min for max cap



Scale up

We will not plan to solve the scale up issue holistically within this KIP, since there is a parallel discussion about Incremental Cooperative Rebalancing, in which we will encode the "when to rebalance" logic at the application level, instead of at the protocol level. 

Adding new static memberships should be straightforward. This operation should be happening fast enough (to make sure capacity could catch up quickly), we are defining another config called expansion timeout. In ideal case, we could actually introduce a new status called "learner" where the newly up hosts could try to catch up with the assigned task progress first before triggering the rebalance, from which we don't see a sudden dip on the progress. 

With the introduction of static membership, we plan to deprecate 


Code Block
titleGroupMetadata.scala
def expansionTimeoutMs = Int // Default 5 min

This is the timeout when we count down to trigger exactly one rebalance (i.e, the time estimate to spin up # of hosts) since the first joined member's request. It is advised to be set roughly the same with session timeout to make sure the workload become balanced when you 2X or 3X your stream job. Example with expansion timeout 5 min: 

In this example unfortunately, we triggered two rebalances, because C is too late to catch first round expansion timeout. When C finally joins, it triggers the counter of expansion timeout. After 5 min, another rebalance kicks in and assign new tasks to C. 

In KStream the scale up is difficult since we need to shuffle the state to new hosts. In ideal case, we could actually introduce a new status called "learner" where the newly up hosts could try to catch up with the assigned task progress first before triggering the rebalance, from which we don't see a sudden dip on the progress. However, it is built on top of the success of KIP-345 and could be addressed in a separate KIP specifically for KStream.


Effectively, we are using expansion timeout to replace rebalance timeout, which is configured by max.poll.intervals from client side, and using registration timeout to replace session timeout. We are also replacing group.initial.rebalance.delay.ms config on broker side to make sure the behavior is consistent for static membership.

...