Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

 

The new consumer currently relies on a server-side coordinator to negotiate the set of consumer processes that form the group and to assign the partitions to each member of the consumer group per some assignment strategy which group members must agree on. This provides assurance that the group will always have a consistent assignment and it enables the coordinator to validate that offsets are only committed from consumers that own the respective partition. However, it relies on the server having access to the code implementing the assignment strategy, which is problematic for two reasons:

...

Below we outline the changes needed to the protocol to make it more general and also the changes to the consumer API to support this.

Protocol

This proposal does not change the basic mechanics of the join group protocol. All members of the group send JoinGroup requests to the coordinator, which waits for all expected members before responding. However, instead of the coordinator returning each consumer's individual assignment, it returns to each member the full list of group members along with their associated metadata. In the case of the new consumer, each member would then compute its assignment independently based on the returned group metadata.

The proposed format of the new JoinGroup messages is given below.

JoinGroup Request

The new join group message is similar to the previous one, but we have dropped the fields specific to consumer partition assignment (e.g. assignment strategy). Instead, all of this information is treated as protocol-specific metadata, which is opaque to the broker. The join group request includes a list of the protocols which the group member supports (sorted by preference). A protocol is used to communicate membership semantics to the members of the group. In the case of the new consumer, it corresponds exactly to the assignment strategy. The coordinator inspects the supported protocols of each member and chooses one that all members support. If no common protocol can be found among members, then the group fails construction. This provides a facility for upgrading to a new version of the protocol in a rolling update. 

Note that each protocol has a field for its own metadata. In the case of the consumer, this allows the assignment strategy to depend on its own format. In the case of the normal round-robin strategy, the metadata would just contain the list of subscribed topics, but other strategies may contain other information (such as the number of cpus on the host).

The GroupProtocolType field provides a scope for the protocol. For the consumer, the protocol type would be "consumer" and the protocols would be "round-robin," "range," etc. Copycat would use "copycat" as the group type and provide its own convention for protocol naming. If group members do not all have the same protocol type, the coordinator will not allow the group to be created (i.e. it will send an error in the join group response). It's an open question whether this is really necessary since the protocol name could embed this information as well.

 

...

To support client-side assignment, we propose to split the group management protocol into two phases: group membership and state synchronization. The first phase is used to set the active members of the group and to elect a group leader. The second phase is used to enable the group leader to synchronize member state in the group (in other words to assign each member's state). From the perspective of the consumer, the first phase is used to collect member subscriptions, while the second phase is used to propagate partition assignments. The elected leader in the join group phase is responsible for setting the assignments for the whole group.

Below we describe the phases of this protocol in more detail.

Phase 1: Joining the Group

The purpose of the initial phase is to set the active members of the group. This protocol has similar semantics as in the initial consumer rewrite design. After finding the coordinator for the group, each member sends a JoinGroup request containing member-specific metadata. The join group request will park at the coordinator until all expected members have sent their own join group requests ("expected" in this case means all members that were part of the previous generation). Once they have done so, the coordinator randomly selects a leader from the group and sends JoinGroup responses to all the pending requests.

The JoinGroup request contains an array with the group protocols that it supports along with member-specific metadata. This is basically used to ensure compatibility of group member metadata within the group. The coordinator chooses a protocol which is supported by all members of the group and returns it in the respective JoinGroup responses. If a member joins and doesn't support any of the protocols used by the rest of the group, then it will be rejected. This mechanism provides a way to update protocol metadata to a new format in a rolling upgrade scenario. The newer version will provide metadata for the new protocol and for the old protocol, and the coordinator will choose the old protocol until all members have been upgraded. 

Code Block
JoinGroupRequest => GroupId SessionTimeout MemberId 

...

ProtocolType GroupProtocols
  GroupId

...

             => String
  SessionTimeout

...

      => int32
  MemberId            

...

=> String
  ProtocolType 

...

       => String
  GroupProtocols

...

      => [Protocol 

...

MemberMetadata]
    Protocol          => String
    MemberMetadata    => bytes 
 
JoinGroupResponse =>

...

 ErrorCode GroupGenerationId GroupLeaderId MemberId Members
  ErrorCode    

...

       => 

...

int16
  GroupGenerationId   => int32
  GroupProtocol 

...

      => 

...

String
  GroupLeaderId       => String
  MemberId            => String
  Members             => [MemberId MemberMetadata]
    MemberId          => String
    MemberMetadata    => bytes

The JoinGroup response includes an array for the members of the group along with their metadata. This is only populated for the leader to reduce the overall overhead of the protocol; for other members, it will be empty. The is used by the leader to prepare member state for phase 2. In the case of the consumer, this allows the leader to collect the subscriptions from all members and set the partition assignment. The member metadata returned in the join group response corresponds to the respective metadata provided in the join group request for the group protocol chosen by the coordinator.

The error cases in this round of the protocol are similar to those described in the consumer rewrite design: Kafka 0.9 Consumer Rewrite Design

Phase 2: Synchronizing Group State

Once the group members have been stabilized by the completion of phase 1, the active leader must propagate state to the other members in the group. This is used in the new consumer protocol to set partition assignments. Similar to phase 1, all members send SyncGroup requests to the coordinator. Once group state has been provided by the leader, the coordinator forwards each member's state respectively in the SyncGroup response. The message format is provided below:

Code Block
SyncGroupRequest => GroupId GroupGenerationId MemberId
  GroupId 

 

JoinGroup Response

The response is similarly modified to remove the fields specific to consumer group management. The coordinator is responsible for analyzing the supported protocols from each group member and choosing one which all members support, which is then transmitted to group members in the join group response. Note that the metadata from each group member for the chosen protocol is returned in the response to all members. This is to allow each member to propagate some local information (such as topic subscriptions) to the entire group. The generation id, as before, is incremented on every successful iteration of the join group protocol.

The basic idea behind the coordinator's protocol selection algorithm is to consider the protocols supported by all members in terms of the preference (as indicated by the position in the list). This means that if all members list protocol "a" before protocol "b," then the coordinator will choose "a." If there is no agreement in terms of preference among the protocols which all members support, then one is chosen randomly.

 

...

          => String
  

...

GroupGenerationId => 

...

int32
  

...

GroupState        => [MemberId 

...

MemberState]
    MemberId        => String
    MemberState     => bytes
 
SyncGroupResponse => ErrorCode MemberState
  ErrorCode         => 

...

int16
  MemberState  

...

     => bytes

The leader sets member states in the GroupState field. For followers, this array must be empty. Once the coordinator has received the group state from the leader, it can unpack each member's state and send it in the MemberState field of the SyncGroup response.

Coordinator State Machine

The coordinator maintains a state machine for each group with the following states:

  • Down: There are no active members and group state has been cleaned up.
  • Initialize: In this state, the coordinator reads group data from Zookeeper (or some other storage) in order to transition groups from failed coordinators. Any heartbeat or join group requests are returned with an error indicating that the coordinator is not ready yet.
  • Stable: In this state, the coordinator either has an active generation or has no members and is awaiting the first JoinGroup. Heartbeats are accepted from members in this state and are used to keep group members active or to indicate that they need to join the group.
  • Joining: The coordinator has received a JoinGroup request from at least one member and is awaiting JoinGroup requests from the rest of the group. Heartbeats or SyncGroup requests in this state return an error indicating that a rebalance is in progress.
  • AwaitingSync: The join group phase has completed (i.e. all expected members of the group have sent JoinGroup requests) and the coordinator is awaiting group state from the leader. Unexpected coordinator requests return an error indicating that a rebalance is in progress. 

Image Added
Note that the generation is incremented on successful completion of the first phase (Joining). Before this phase completes, the old generation has an opportunity to do any necessary cleanup work (such as commit offsets in the case of the new consumer). Upon transition to the AwaitSync state, the coordinator begins a timer for each member according to their respective session timeouts. If the timeout expires for any member, then the coordinator must trigger a rebalance.

The group leader is responsible for synchronizing state for the group upon completion of the Joining state. If the leader's session timeout expires before the coordinator has received the leader's SyncGroup, then the generation becomes invalid and the members must rejoin. It is also possible for the leader to transmit an error in the SyncGroup request. In this case also, the generation becomes invalid and the error can be propagated to the other members of the group. 

Note that the transition from AwaitSync to Stable occurs only when the leader's SyncGroup has been received. It is possible that the SyncGroup from followers may therefore arrive either in the AwaitSync state or in the Stable state. If the former, then the coordinator will park the request until the SyncGroup from the leader has been received (or its timeout has expired). If the latter, then the coordinator can respond to the SyncGroup request immediately using the leader's synchronized state. Clearly this requires the coordinator to store this state at least for the duration of the max session timeout in the group. It is also be possible that the member fails before collecting its state: in this case, the member's session timeout will expire and the group will rebalance.

Consumer Embedded Protocol

Above we outlined the generalized JoinGroup protocol that the consumer will use. Next we show how we will implement consumer semantics on top of this protocol. Other use cases for the join group protocol would be implemented similarly.

The two phases of the group protocol correspond to subscription and assignment for the new consumer. Each member of the group submits their subscription as member metadata. The leader of the group collects all subscription in its JoinGroup response and sends the assignment as member state in SyncGroup. There are several advantages to having a single assignor:

  1. Since the leader makes the assignment for the full group, it is the single source of truth for the metadata used in its decision making. This avoids the need to synchronize metadata among all members that is required in a multi-assignor approach. 
  2. The leader of the group can enforce its own policy for controlling the rate of rebalancing. It doesn't have to rebalance after every metadata change, but can "batch" changes together to reduce the impact of metadata churn.
  3. The leader is the only member that needs to receive the metadata from all members of the group. This reduces the overhead of the protocol.

The group protocol used by the consumer in the JoinGroup request corresponds to the assignment strategy that the leader will use to determine partition assignment. This allows the consumer to upgrade from one assignment strategy to another without downtime. The metadata corresponding to the assignment strategy can be strategy-specific, but generally it will include the group subscriptions for the member. The state returned to members in the SyncGroup will include the partitions assigned to that member.

For all assignment strategies, group members provide their subscriptions as an array of strings. This subscription can either be a list of topics or regular expressions (TODO: do we need distinguisher field to tell the difference? how about regex compatibility?). Partition assignments are provided in the SyncGroup response as an array of topics and partitions. The protocol supports custom data in both the subscription and assignment as a generic array of bytes to allow for custom assignor implementations. For example, a rack-aware assignor will generally need to propagate the rackId of each member to the leader in its subscription so that it can take it into account for assignment.  

Code Block
ProtocolType => "consumer"
 
GroupProtocol  => AssignmentStrategy
  AssignmentStrategy => String
 
MemberMetadata => Version Subscription AssignmentStrategies
  Version      => int16
  Subscription => Topics UserData
    Topics     => [String]
    UserData     => Bytes
    
MemberState => Version Assignment
  Version           => int16
  Assignment        => TopicPartitions UserData
    TopicPartitions => [Topic Partitions]
      Topic      => String
      Partitions => [int32]
    UserData     => Bytes

Protocol: Briefly, this is how the protocol works for the consumer. 

  1. Members JoinGroup with their respective subscriptions.
  2. The leader collects member subscriptions from its JoinGroup response and performs the group assignment.  
  3. All members (including the leader) send SyncGroup to find their assignment.
  4. Once created, there are two cases which can trigger reassignment:
    1. Topic metadata changes which have no impact on subscriptions cause resync. The leader computes the new assignment and sends SyncGroup.
    2. Membership or subscription changes cause rejoin. 

Rolling Upgrades: To support rolling upgrades without downtime, there are two cases to consider: 

  1. Changes affecting subscription: the protocol directly supports differing subscriptions, so there is no need for special handling. Members will only be assigned partitions compatible with their subscription.
  2. Assignment strategy changes: to support a change to the assignment strategy, new versions must enable support both for the old assignment strategy and the new one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. This preference is implicit in the order of the strategies in the JoinGroup request.

Handling Coordinator FailuresThis proposal largely shares the coordinator failure cases and recovery mechanism from the initial protocol documented in Kafka 0.9 Consumer Rewrite Design. The recovery process depends on whether group state is persisted (e.g. in Zookeeper). With no persistence, then group members will generally have to rejoin the group when the new coordinator becomes active. Below, we assume persistence and show how failures are treated at the various stages of the protocol. 

  1. All members send JoinGroup. 
  2. Generation metadata is persisted.
  3. All members send SyncGroup.
  4. Sync metadata is persisted.

Coordinator failures at these steps are handled in the following ways:

  1. If the coordinator fails before all members have joined the group or before group metadata has been persisted, then all members will resend their JoinGroup requests once the new coordinator is stable.
  2. The coordinator may fail after group metadata has been persisted, but before all members of the group have received a response to their JoinGroup requests. The problem here is that once the new coordinator is stable, the leader may try to immediately synchronize while other members are still trying to join. However, when the coordinator receives a JoinGroup request from any member, it must abort any active synchronization and force all members to rejoin.
  3. If the coordinator fails before a pending synchronization has been persisted, then all members will re-initiate the SyncGroup once the new coordinator is ready. 
  4. If the coordinator fails after the metadata has been persisted, but before all members have received the SyncGroup response, then those members will initiate SyncGroup upon failover. Assuming synchronized state is persisted, then the coordinator can return that member's state immediately without forcing other members to resync. If it is not persisted, then a full group resync is required.

Other Interesting Cases:

  • Leader Failures: The leader of each group is responsible for initiating group synchronization when topic metadata changes. A leader failure is detected by the coordinator through the expiration of its session timeout. The coordinator will respond by forcing all members to rejoin, which will allow a new leader to be elected.
  • Assignment Failure: As mentioned above, there are several ways that the synchronization/assignment phase can fail. Generally, they are handled by having group members rejoin the group. The most interesting case is when the leader encounters an unrecoverable error when it computes the group's assignment. This could happen, for example, if group members don't agree on the assignment strategy to use. In this case, the assignment failure is forwarded to the broker which can then propagate it to awaiting members.
  • Subscription Change: If a member changes its subscription, then it must force the group to be recreated by sending a JoinGroup request to the coordinator. This will cause the coordinator to reply to the other member's heartbeats with an error indicating that rejoin is needed, which will cause them to also send JoinGroup requests.
  • Topic Metadata Change: The leader is responsible for detecting topic metadata changes which affect the group's subscription. When it finds a change, it can immediately compute the new assignment and initiate a SyncGroup with the coordinator.

One of the major concerns in this protocol is the size of the join group response. Since each member's metadata is included in the responses for all members, the total amount of data which the coordinator must forward in the join group responses increases quadratically with the size of the group. For example, with a per-member metadata size of 100KB, in a group of 100 members, each join group response would contain 10MB of data, which means that the coordinator would have to transfer 1GB total on every rebalance. It is therefore important to keep the size of the metadata fairly small. Even with smaller metadata size, the group can only grow so large before this becomes a concern again. However, we argue that the protocol is already unsuited to such large groups since it does not have any mechanism to cope with churn. Every time there is a membership change in the group, all members must synchronize to form next generation. If this happens often enough, as is possible with larger groups, then progress is severely restricted.

Consumer Embedded Protocol

Above we outlined the generalized JoinGroup protocol that the consumer would leverage. Next we show how we intend to implement consumer semantics on top of this protocol. Other use cases for the join group protocol would be implemented similarly. The two items that must be defined to use the join group protocol are the format of the protocol versions and the format of the protocol metadata.

Code Block
ProtocolType => "consumer"
 
Protocol => AssignmentStrategy
  AssignmentStrategy => String
 
ProtocolMetadata => Subscription
  Subscription                 => Topics TopicPattern MetadataHash
    Topics                     => [String]
    TopicPattern               => String
    MetadataHash               => bytes

Subscriptions: To support differing subscriptions within the group, each member must include its own subscription in the protocol metadata. These subscriptions are forwarded to all members of the group who can then independently compute their assignment. Subscriptions can be specified either as a list of topics or as a regular expression. The latter can provide a more compact representation when subscribing to a large number of topics (e.g. if using mirror maker to replicate all the topics in a cluster).

Metadata: The metadata hash is included to ensure that each consumer has the same view of the topic metadata. A disagreement could cause an inconsistent assignment, so upon joining the group, each member checks the metadata hash of all other members to make sure they are consistent. It covers the full list of topics in the subscription set and their respective partition counts. If a regex subscription is used, then the hash covers all the topics in the cluster. If there is any disagreement on the number of partitions (e.g. due to stale metadata), then the hashes will compute differently and the consumers will refetch metadata and rejoin the group.

One potential concern in this protocol is whether a sustained disagreement might lead to continual rebalancing. This could be possible if two brokers disagree on the topic metadata for an extended period of time. While metadata should eventually converge, this issue can be resolved by having the consumers fetch their metadata from the coordinator, ensuring that they each see the same view. However, it would still be possible to have metadata disagreement if the metadata itself is changing at a very high rate.

It is worth mentioning that there is a class of assignment strategies which do not depend on consistent metadata among the consumers. For example, in a consistent hashing approach, each partition would be deterministically mapped to one of the group members. Even if two members see a different partition count for a topic, there would be no disagreement over which consumer owns each partition. The tradeoff is generally sub-optimal load balancing of partitions across consumers.

Note that the format of the metadata is an attribute of the assignment strategy. This makes it possible for different strategies to support different metadata formats. For rack-aware assignment, the metadata would also include the rack of each consumer, and the metadata hash would have to cover the leader of each partition since that governs where fetches will be sent to and the whole point of rack-aware assignment is to fetch from brokers on the same rack as the consumer. In general, any information that is used in decision making must somehow be included in the metadata.

Open Questions

  1. As mentioned previously, the need to propagate the metadata of each member to all other members puts a significant limit on the amount of metadata that can be used in large groups. For small and medium-sized groups, this is probably not a major concern, but assignment strategies must be mindful of the metadata size and set clear scaling expectations. 
  2. For client-side assignment in general, the strategies must be deterministic. This is actually not as restrictive as it may seem, since random seed data could be distributed as consumer metadata. Nevertheless, truly non-deterministic assignment strategies are not possible or at least must combine a deterministic aspect to ensure agreement among consumers. For the current known use cases, this doesn't appear to be an issue.
  3. The consumer's embedded protocol has some redundancy when multiple assignment strategies are supported. In particular, the subscription list will be duplicated for each assignment strategy. We could restructure the protocol to avoid this, but that would require a consumer-specific format, which would introduce another layer of versioning. As it is, versions are only tied to assignment strategies.

KafkaConsumer API

Although the protocol is slightly more complex for the KafkaConsumer implementation, most of the details are hidden from users. Below we show the basic assignment interface that would be exposed in KafkaConsumer. The partition assigner is generic to allow for custom metadata. For simple versions, the generic type would probably be Void.

Code Blockclass ConsumerMetadata<T> { String consumerId; List<String> subscribedTopics; T metadata; }   interface PartitionAssigner<T> extends Configurable {   /** * Derive the metadata to be used for the local member by this assigner. This could  * come from configuration or it could be derived dynamically (e.g. for host information * such as the hostname or number of cpus). * @return The metadata */ public T metadata(); /** * Assign partitions for this consumer. * @param consumerId The consumer id of this consumer * @param partitionsPerTopic The count of partitions for each subscribed topic * @param consumers Metadata for consumers in the current generation */ List<TopicPartition> assign(String consumerId, Map<String, Integer> partitionsPerTopic,  List<ConsumerMetadata<T>> consumers); }

TODO:

To support client-side assignment, we'd have to make the following changes:

  1. Migrate existing assignment strategies from the broker to the client. Since the assignment interface is nearly the same, this should be straightforward.
  2. Modify client/server for the new join group protocol. Since we're not really changing the protocol (just the information that is passed through it), this should also be straightforward.
  3. Remove offset validation from the consumer coordinator. Just a couple lines to remove for this.
  4. Add support for assignment versioning (if we decide we need it). Depending on what we do, may or may not be trivial.