...
Code Block |
---|
JoinGroupRequest => GroupId GroupProtocol SessionTimeout MemberId MemberMetadata GroupId => String GroupProtocol => String SessionTimeout => int32 MemberId => String MemberMetadata => bytes JoinGroupResponse => ErrorCode GroupGenerationId GroupLeaderId MemberId Members ErrorCode => int16 GroupGenerationId => int32 GroupLeaderId => String MemberId => String |
The error cases in this round of the protocol are similar to those described in the consumer rewrite design: Kafka 0.9 Consumer Rewrite Design.
Phase 2: Synchronizing Group State
Once the group members have been stabilized by the completion of phase 1, the active leader has the option to propagate additional state to the group members. This is used in the new consumer protocol to set partition assignments. Just like the JoinGroup phase, this synchronization acts as a barrier which all members must acknowledge by sending their own SyncGroup requests. The message format is provided below:
Members => [MemberId MemberMetadata]
MemberId => String
MemberMetadata => bytes |
The JoinGroup response includes an array for the members of the group along with their metadata. This is only populated for the leader; for other members, it will be empty. The is used by the leader to prepare member state for phase 2. In the case of the consumer, this allows the leader to collect the subscriptions from all members and set the partition assignment.
The error cases in this round of the protocol are similar to those described in the consumer rewrite design: Kafka 0.9 Consumer Rewrite Design.
Phase 2: Synchronizing Group State
Once the group members have been stabilized by the completion of phase 1, the active leader has the option to propagate additional state to the group members. This is used in the new consumer protocol to set partition assignments. Just like the JoinGroup phase, this synchronization acts as a barrier which all members must acknowledge by sending their own SyncGroup requests. The message format is provided below:
Code Block |
---|
SyncGroupRequest => GroupId GroupGenerationId MemberId
GroupId |
Code Block |
SyncGroupRequest => GroupId GroupGenerationId MemberId GroupId => String GroupGenerationId => int32 SyncErrorCode => int16 GroupState => [MemberId MemberState]String GroupGenerationId => MemberIdint32 SyncErrorCode => int16 GroupState => [MemberId MemberState] MemberId => String MemberState => bytes SyncGroupResponse => ErrorCode GroupGenerationId MemberState ErrorCode => int16 GroupGenerationId => int32 MemberState => bytes |
The leader sets member states in the GroupState
field. For followers, this array must be empty. Once the coordinator has received the group state from the leader, and all members have sent their SyncGroup requests, the coordinator unpacks the group state and responds with each member's respective state in the SyncGroup response.
...
- The leader can fail prior to performing an expected synchronization. In this case, the session timeout of the leader will expire and the coordinator will mark it dead and force all members to rejoin the group, which will result in new leader being elected.
- The leader's synchronization procedure can fail. For example, if the leader cannot parse one member's metadata, then the state synchronization should generally fail. Since this is most likely a non-recoverable error, there must be some way to propagate the error to all members of the group. The SyncGroup request contains an error field which is used for this purpose. All members will send SyncGroup requests as before, only this time once all members have joined, the error code will be forwarded in the SyncGroup response. The client can then propagate the error to the user.
- Followers can fail to join the synchronization barrier. If a follower fails to send an expected SyncGroup, then the member's sessionId will expire which will force members to rejoin.
- New members can join during synchronization. If a new member sends a JoinGroup during this phase, then the synchronization round will fail with an error indicating that rejoining the group is necessary.
DescribeGroup: Note that the JoinGroup response in the first phase above doesn't contain a list of the members of the group or their respective metadata. So how is group leader to find the subscriptions in order to make its assignment? For this, we implement a new request type to describe the active group. The leader is responsible for using this to collect the group's metadata before setting the assignment. We could have alternatively sent the member list in the join group response, but this leads to undesirable message overhead when groups begin to get larger.
The request format for DescribeGroup is provided below. Note that this request is also useful for administration purposes since it provides access both to the group's subscription and assignment.
...
A couple additional points are worth noting in this phase:
...