...
In addition, as we are promoting incremental rebalances such as KIP-415, later eventually we will hope to support stateful consumers such as KStream Kafka Streams group to have new member only taking in standby task and give them time to replay the state when first joined. These new followers need to indicate a change of status when they have finished replaying the state and ready to take active tasks. If no JoinReason is specified, brokers will not be able to distinguish the joiner's purpose: whether you are requiring an incremental rebalance, or you are just joining for restart?
Furthermore, the JoinReason could serve as a helpful hint when we debug consumer rebalances in the historical perspective. Broker just needs to memorize the cause of the state transition to PrepareRebalance by the group.instance.id and JoinReason to sort out a past timeline that could cross validate the delay from topic metadata change towards group leaders' notification. There is a big room to improve upon this feature that worths wider discussion.
In conclusion, having JoinReason to gracefully handle the problem of rebalance necessity could simplify the implementation logic by a lot, and hide enough rebalance protocol iteration, reduce unintended state shuffle and hide low-level details to brokers' perspective on whether to move the group towards PrepareRebalance.
Public Interfaces
We will add a new enum field to the JoinGroupRequest interface, and bump the protocol version to v6:
Code Block |
---|
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId GroupInstanceId ProtocolType GroupProtocols JoinReason
GroupId => String
SessionTimeout => int32
RebalanceTimeout => int32
MemberId => String
GroupInstanceId => String
ProtocolType => String
GroupProtocols => [Protocol MemberMetadata]
Protocol => String
MemberMetadata => bytes
JoinReason => Enum // new |
Code Block | ||||
---|---|---|---|---|
| ||||
public static Schema[] schemaVersions() { return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2, JOIN_GROUP_REQUEST_V3, JOIN_GROUP_REQUEST_V5, JOIN_GROUP_REQUEST_V6}; } |
...
Code Block | ||
---|---|---|
| ||
public enum JoinReason { RESTART("restart"), // Join request from a restart/newly started consumer TOPIC_METADATA_CHANGE("topic_metadata_change"); // The topic metadata has changed (must be from the leader) UPGRADE("upgrade"), // The client is doing upgrade that requires rebalance. } |
Proposed Changes
Detecting a change of topic metadata is currently the only case when leader consumer wants to trigger a group rebalance. We will explicitly set the JoinReason to `topic_metadata_change` so that group coordinator will proceed to rebalance stage when hit hitting this reason. For dynamic members ( members rejoin with UNKNOWN_MEMBER_ID) . the rebalance will still trigger because we only check the join reason when the member is joining with an identity. For static membersis implicitly conveyed as "requiring a new member identity and grow the group" which is reasonable to trigger rebalance. For members joining with identity (either known member.id or known group.instance.id), if the JoinReason is specified as `RESTART`, stable group won't trigger rebalance since this indicates a restart happen happens on this member and nothing should be affected for the entire group.
...