The new consumer currently relies on a server-side coordinator to negotiate the set of consumer processes that form the group and to assign the partitions to each member of the consumer group per some assignment strategy which group members must agree on. This provides assurance that the group will always have a consistent assignment and it enables the coordinator to validate that offsets are only committed from consumers that own the respective partition. However, it relies on the server having access to the code implementing the assignment strategy, which is problematic for two reasons:
If new assignment use cases were rare, this may be a viable solution, but we are already have a number of cases where more control over assignment is needed. For example:
To address the problems pointed out above and support custom assignment strategies easily, we propose to move the assignment to the client. Specifically, we propose to separate the group management capability provided by the coordinator from partition assignment. We leave the coordinator to handle the former, while the latter is pushed into the consumer. This promotes separation of concerns and loose coupling.
More concretely, instead of the JoinGroup protocol returning each consumer's assignment directly, we modify the protocol to return the list of members in the group and have each consumer decide its assignment independently. This solves the deployment problem since it is typically an order of magnitude easier to update clients than servers. It also decouples the server from the needs of the assignment strategy, which allow us to support the above use cases without any server changes and provide some "future-proofing" for new use cases. For consumers, the join group protocol becomes more of an abstract group membership capability which, in addition to enabling assignment, can be used as a primitive to build other group management functions (such as leadership).
There are some disadvantages though. First, since the coordinator does not know the owners of a partition, it can no longer verify that offset commits come from the "right" consumer, which potentially opens the door to inconsistent processing. However, as mentioned above, the ability of the server to validate assignments (and therefore commits) would have to be handicapped anyway to support redundant partitioning. Also, with client-side assignment, debugging assignment bugs requires a little more work. Finding assignment errors may involve aggregating logs from each consumer in the group. In practice, the partitioning strategies used by most users will be simple and tested enough that such errors should be unlikely, but it is still a potential concern.
So far, we made an argument to separate group management from resource assignment. A significant benefit of this proposal is that it enables the group membership protocol to be used for other purposes. Below we outline all the use cases that would now be possible due to group management becoming a generic facility in the Kafka protocol.
Given that there are several non-consumer use cases for a general group management protocol, we propose changing JoinGroupRequest and JoinGroupResponse such that it is not tied to consumer specific concepts.
Below we outline the changes needed to the protocol to make it more general and also the changes to the consumer API to support this.
To support client-side assignment, we propose to split the group management protocol into two phases: group membership and state synchronization. The first phase is used to set the active members of the group and to elect a group leader. The second phase is used to enable the group leader to synchronize member state in the group (in other words to assign each member's state). From the perspective of the consumer, the first phase is used to collect member subscriptions, while the second phase is used to propagate partition assignments. The elected leader in the join group phase is responsible for setting the assignments for the whole group.
Below we describe the phases of this protocol in more detail.
The purpose of the initial phase is to set the active members of the group. This protocol has similar semantics as in the initial consumer rewrite design. After finding the coordinator for the group, each member sends a JoinGroup request containing member-specific metadata. The join group request will park at the coordinator until all expected members have sent their own join group requests ("expected" in this case means all members that were part of the previous generation). Once they have done so, the coordinator randomly selects a leader from the group and sends JoinGroup responses to all the pending requests.
The JoinGroup request contains an array with the group protocols that it supports along with member-specific metadata. This is basically used to ensure compatibility of group member metadata within the group. The coordinator chooses a protocol which is supported by all members of the group and returns it in the respective JoinGroup responses. If a member joins and doesn't support any of the protocols used by the rest of the group, then it will be rejected. This mechanism provides a way to update protocol metadata to a new format in a rolling upgrade scenario. The newer version will provide metadata for the new protocol and for the old protocol, and the coordinator will choose the old protocol until all members have been upgraded.
JoinGroupRequest => GroupId SessionTimeout MemberId ProtocolType GroupProtocols GroupId => String SessionTimeout => int32 MemberId => String ProtocolType => String GroupProtocols => [Protocol MemberMetadata] Protocol => String MemberMetadata => bytes JoinGroupResponse => ErrorCode GroupGenerationId GroupLeaderId MemberId Members ErrorCode => int16 GroupGenerationId => int32 GroupProtocol => String GroupLeaderId => String MemberId => String Members => [MemberId MemberMetadata] MemberId => String MemberMetadata => bytes |
The JoinGroup response includes an array for the members of the group along with their metadata. This is only populated for the leader to reduce the overall overhead of the protocol; for other members, it will be empty. The is used by the leader to prepare member state for phase 2. In the case of the consumer, this allows the leader to collect the subscriptions from all members and set the partition assignment. The member metadata returned in the join group response corresponds to the respective metadata provided in the join group request for the group protocol chosen by the coordinator.
The error cases in this round of the protocol are similar to those described in the consumer rewrite design: Kafka 0.9 Consumer Rewrite Design.
Once the group members have been stabilized by the completion of phase 1, the active leader must propagate state to the other members in the group. This is used in the new consumer protocol to set partition assignments. Similar to phase 1, all members send SyncGroup requests to the coordinator. Once group state has been provided by the leader, the coordinator forwards each member's state respectively in the SyncGroup response. The message format is provided below:
SyncGroupRequest => GroupId GroupGenerationId MemberId GroupId => String GroupGenerationId => int32 GroupState => [MemberId MemberState] MemberId => String MemberState => bytes SyncGroupResponse => ErrorCode MemberState ErrorCode => int16 MemberState => bytes |
The leader sets member states in the GroupState
field. For followers, this array must be empty. Once the coordinator has received the group state from the leader, it can unpack each member's state and send it in the MemberState
field of the SyncGroup response.
The coordinator maintains a state machine for each group with the following states:
Note that the generation is incremented on successful completion of the first phase (Joining). Before this phase completes, the old generation has an opportunity to do any necessary cleanup work (such as commit offsets in the case of the new consumer). Upon transition to the AwaitSync state, the coordinator begins a timer for each member according to their respective session timeouts. If the timeout expires for any member, then the coordinator must trigger a rebalance.
The group leader is responsible for synchronizing state for the group upon completion of the Joining state. If the leader's session timeout expires before the coordinator has received the leader's SyncGroup, then the generation becomes invalid and the members must rejoin. It is also possible for the leader to transmit an error in the SyncGroup request. In this case also, the generation becomes invalid and the error can be propagated to the other members of the group.
Note that the transition from AwaitSync to Stable occurs only when the leader's SyncGroup has been received. It is possible that the SyncGroup from followers may therefore arrive either in the AwaitSync state or in the Stable state. If the former, then the coordinator will park the request until the SyncGroup from the leader has been received (or its timeout has expired). If the latter, then the coordinator can respond to the SyncGroup request immediately using the leader's synchronized state. Clearly this requires the coordinator to store this state at least for the duration of the max session timeout in the group. It is also be possible that the member fails before collecting its state: in this case, the member's session timeout will expire and the group will rebalance.
Above we outlined the generalized JoinGroup protocol that the consumer will use. Next we show how we will implement consumer semantics on top of this protocol. Other use cases for the join group protocol would be implemented similarly.
The two phases of the group protocol correspond to subscription and assignment for the new consumer. Each member of the group submits their subscription as member metadata. The leader of the group collects all subscription in its JoinGroup
response and sends the assignment as member state in SyncGroup
. There are several advantages to having a single assignor:
The group protocol used by the consumer in the JoinGroup request corresponds to the assignment strategy that the leader will use to determine partition assignment. This allows the consumer to upgrade from one assignment strategy to another without downtime. The metadata corresponding to the assignment strategy can be strategy-specific, but generally it will include the group subscriptions for the member. The state returned to members in the SyncGroup will include the partitions assigned to that member.
For all assignment strategies, group members provide their subscriptions as an array of strings. This subscription can either be a list of topics or regular expressions (TODO: do we need distinguisher field to tell the difference? how about regex compatibility?). Partition assignments are provided in the SyncGroup response as an array of topics and partitions. The protocol supports custom data in both the subscription and assignment as a generic array of bytes to allow for custom assignor implementations. For example, a rack-aware assignor will generally need to propagate the rackId of each member to the leader in its subscription so that it can take it into account for assignment.
ProtocolType => "consumer" GroupProtocol => AssignmentStrategy AssignmentStrategy => String MemberMetadata => Version Subscription AssignmentStrategies Version => int16 Subscription => Topics UserData Topics => [String] UserData => Bytes MemberState => Version Assignment Version => int16 Assignment => TopicPartitions UserData TopicPartitions => [Topic Partitions] Topic => String Partitions => [int32] UserData => Bytes |
Protocol: Briefly, this is how the protocol works for the consumer.
Rolling Upgrades: To support rolling upgrades without downtime, there are two cases to consider:
Handling Coordinator Failures: This proposal largely shares the coordinator failure cases and recovery mechanism from the initial protocol documented in Kafka 0.9 Consumer Rewrite Design. The recovery process depends on whether group state is persisted (e.g. in Zookeeper). With no persistence, then group members will generally have to rejoin the group when the new coordinator becomes active. Below, we assume persistence and show how failures are treated at the various stages of the protocol.
Coordinator failures at these steps are handled in the following ways:
Other Interesting Cases:
To support client-side assignment, we'd have to make the following changes: