Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
JoinGroupRequest => GroupId GroupProtocol SessionTimeout MemberId MemberMetadata
  GroupId             => String
  GroupProtocol       => String
  SessionTimeout      => int32
  MemberId            => String
  MemberMetadata      => bytes
 
JoinGroupResponse => ErrorCode GroupGenerationId GroupLeaderId MemberId Members
  ErrorCode           => int16
  GroupGenerationId   => int32
  GroupLeaderId       => String
  MemberId            => String

The error cases in this round of the protocol are similar to those described in the consumer rewrite design: Kafka 0.9 Consumer Rewrite Design.

Phase 2: Synchronizing Group State

Once the group members have been stabilized by the completion of phase 1, the active leader has the option to propagate additional state to the group members. This is used in the new consumer protocol to set partition assignments. Just like the JoinGroup phase, this synchronization acts as a barrier which all members must acknowledge by sending their own SyncGroup requests. The message format is provided below:


  Members             => [MemberId MemberMetadata]
    MemberId          => String
    MemberMetadata    => bytes

The JoinGroup response includes an array for the members of the group along with their metadata. This is only populated for the leader; for other members, it will be empty. The is used by the leader to prepare member state for phase 2. In the case of the consumer, this allows the leader to collect the subscriptions from all members and set the partition assignment.  

The error cases in this round of the protocol are similar to those described in the consumer rewrite design: Kafka 0.9 Consumer Rewrite Design

Phase 2: Synchronizing Group State

Once the group members have been stabilized by the completion of phase 1, the active leader has the option to propagate additional state to the group members. This is used in the new consumer protocol to set partition assignments. Just like the JoinGroup phase, this synchronization acts as a barrier which all members must acknowledge by sending their own SyncGroup requests. The message format is provided below:

Code Block
SyncGroupRequest => GroupId GroupGenerationId MemberId
  GroupId   
Code Block
SyncGroupRequest => GroupId GroupGenerationId MemberId
  GroupId           => String
  GroupGenerationId => int32
  SyncErrorCode     => int16
  GroupState        => [MemberId MemberState]String
  GroupGenerationId => MemberIdint32
  SyncErrorCode     => int16
  GroupState        => [MemberId MemberState]
    MemberId        => String
    MemberState     => bytes
 
SyncGroupResponse => ErrorCode GroupGenerationId MemberState
  ErrorCode         => int16
  GroupGenerationId => int32
  MemberState       => bytes

 

The leader sets member states in the GroupState field. For followers, this array must be empty. Once the coordinator has received the group state from the leader, and all members have sent their SyncGroup requests, the coordinator unpacks the group state and responds with each member's respective state in the SyncGroup response. 

...

  1. The leader can fail prior to performing an expected synchronization. In this case, the session timeout of the leader will expire and the coordinator will mark it dead and force all members to rejoin the group, which will result in new leader being elected. 
  2. The leader's synchronization procedure can fail. For example, if the leader cannot parse one member's metadata, then the state synchronization should generally fail. Since this is most likely a non-recoverable error, there must be some way to propagate the error to all members of the group. The SyncGroup request contains an error field which is used for this purpose. All members will send SyncGroup requests as before, only this time once all members have joined, the error code will be forwarded in the SyncGroup response. The client can then propagate the error to the user.
  3. Followers can fail to join the synchronization barrier. If a follower fails to send an expected SyncGroup, then the member's sessionId will expire which will force members to rejoin.
  4. New members can join during synchronization. If a new member sends a JoinGroup during this phase, then the synchronization round will fail with an error indicating that rejoining the group is necessary.

DescribeGroup: Note that the JoinGroup response in the first phase above doesn't contain a list of the members of the group or their respective metadata. So how is group leader to find the subscriptions in order to make its assignment? For this, we implement a new request type to describe the active group. The leader is responsible for using this to collect the group's metadata before setting the assignment. We could have alternatively sent the member list in the join group response, but this leads to undesirable message overhead when groups begin to get larger.

The request format for DescribeGroup is provided below. Note that this request is also useful for administration purposes since it provides access both to the group's subscription and assignment.

...

A couple additional points are worth noting in this phase:

...