Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
JoinGroupRequest => 
 GROUP_ID => STRING
 SESSION_TIMEOUT => INT32
 REBALANCE_TIMEOUT => INT32
 STATIC_REGISTRATION_TIMEOUT => INT32 // NEW
 STATIC_EXPANSION_TIMEOUT => INT32  // NEW
 MEMBER_NAME => INT32 // NEW
 PROTOCOL_TYPE => STRING
 GROUP_PROTOCOLS => ARRAY

offsetCommitRequest => 
 GROUP_ID => STRING
 GENERATION_ID => STRING
 MEMBER_ID => STRING
 MEMBER_NAMESNAME => STRING // NEW
 TOPICS => PARTITION DATA

...

Code Block
titleConsumerConfig.java
public static final STRING REGISTRATION_TIMEOUT_MS = "registration.timeout.ms"; // Default 30 min

Registration timeout is the timeout we will trigger rebalance when a member goes offline for too long. It should usually be set much larger than session timeout which is used to detect consumer health. It is monitored through heartbeat the same way as session timeout, and will replace the session timeout in a static membership. The reason we define a different config is because we would like easy switch between dynamic membership and static membership (See details here). By setting it to 15 ~ 30 minutes, we are loosening the track of static member progress, and transfer the member management to client application like K8. Of course, we should not wait forever for the member to back online simply for the purpose of reducing rebalances. Eventually the member will be kicked out of group and a final rebalance is triggered. Note that we are tracking the earliest offline member and compare with the registration timeout. Example below with registration timeout 15 min:

...

Code Block
titleConsumerConfig.java
public static final STRING EXPANSION_TIMEOUT_MS = "expansion.timeout.ms"; // Default 5 min

This is the timeout when we count down to trigger exactly one rebalance (i.e, the time estimate to spin up # of hosts) since the first joined member's request. It is advised to be set roughly the same with session timeout to make sure the workload become balanced when you 2X or 3X your stream job. Example with expansion timeout 5 min: 

...

For the very first version, we hope to make membership transferring logic simple enough. As we have mentioned, the goal of static membership is to reduce multiple rebalances with intermittent failures. There should be an easy way to fallback to dynamic membership when user prefers to let broker handle task assignments, for example liveness becomes a more important factor when the state tuning has big progress. The fallback is simple: when the membership becomes stable, the first join group request will decide the membership protocol for next round. For example, when we are running stable with static membership, deploy a new change to the client app without member.name being set could invalidate the current hashmap on broker, and all the v4 request containing member.name will be treated as a normal dynamic member and perform the sticky assignment in dynamic context as usual. Note that we intentionally separate session.timeout and registration.timeout so user doesn't have to change any setting except member name, although they are serving the same purpose in different protocols.and human handling simple enough. We will define three admin APIs to help us better manage the groups:

Code Block
titleAdminClient.java
public static MembershipChangeResult enableStaticMembership(String groupId, int registrationTimeout, int expansionTimeout)
public static MembershipChangeResult enableDynamicMembership(String groupId)
public static MembershipChangeResult forceRebalance(String groupId)

where enableStaticMembership will change the consumer group to static membership, along with changing registration timeout and expansion timeout. After that, all the joining members are required to set the member name field. Error will be returned if the broker is on an old version or other potential failure cases. Note that the client should already include member name field at this point.

enableDynamicMembership will in the contrary just change the membership back to dynamic mode. Error will be returned if the broker is on an old version, group is already on dynamic membership or so on.

forceRebalance will trigger one rebalance immediately on static membership. Error will be returned if the broker is on an old version, or group is on dynamic membership and so on.Image Removed

Compatibility, Deprecation, and Migration Plan

...