Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Below we outline the changes needed to the protocol to make it more general and also the changes to the consumer API to support this.

Protocol

This proposal does not change the basic mechanics of the join group protocol. All To support client-side assignment, we propose to split the group management protocol into two phases: group membership and state synchronization. The first phase is used to set the active members of the group send JoinGroup requests to the coordinator, which waits for all expected members before responding. However, instead of the coordinator returning each consumer's individual assignment, it returns to each member the full list of group members along with their associated metadata. In the case of the new consumer, each member would then compute its assignment independently based on the returned group metadata.

The proposed format of the new JoinGroup messages is given below.

JoinGroup Request

The new join group message is similar to the previous one, but we have dropped the fields specific to consumer partition assignment (e.g. assignment strategy). Instead, all of this information is treated as protocol-specific metadata, which is opaque to the broker. The join group request includes a list of the protocols which the group member supports (sorted by preference). A protocol is used to communicate membership semantics to the members of the group. In the case of the new consumer, it corresponds exactly to the assignment strategy. The coordinator inspects the supported protocols of each member and chooses one that all members support. If no common protocol can be found among members, then the group fails construction. This provides a facility for upgrading to a new version of the protocol in a rolling update. 

Note that each protocol has a field for its own metadata. In the case of the consumer, this allows the assignment strategy to depend on its own format. In the case of the normal round-robin strategy, the metadata would just contain the list of subscribed topics, but other strategies may contain other information (such as the number of cpus on the host).

The GroupProtocolType field provides a scope for the protocol. For the consumer, the protocol type would be "consumer" and the protocols would be "round-robin," "range," etc. Copycat would use "copycat" as the group type and provide its own convention for protocol naming. If group members do not all have the same protocol type, the coordinator will not allow the group to be created (i.e. it will send an error in the join group response). It's an open question whether this is really necessary since the protocol name could embed this information as well.

 

...

and to elect a group leader. The second phase is used to enable the group leader to synchronize member state in the group. From the perspective of the consumer, the first phase is used to collect member subscriptions, while the second phase is used to propagate partition assignments. The elected leader in the join group phase is responsible for setting the assignments for the whole group.

Below we describe the phases of this protocol in more detail.

Phase 1: Joining the Group

The purpose of the initial phase is to set the active members of the group. This protocol has basically the same semantics as the initial consumer rewrite design. After finding the coordinator for the group, each member sends a JoinGroup request containing member-specific metadata. The join group request will park at the coordinator until all expected members have sent their own join group requests. Once all have done so, the coordinator randomly selects a leader from the group and sends JoinGroup responses to all the pending requests.

Code Block
JoinGroupRequest => GroupId SessionTimeout MemberId MemberMetadata
  GroupId             => String
  SessionTimeout      => int32
  MemberId            => String
  MemberMetadata      => bytes
 
JoinGroupResponse => ErrorCode GroupGenerationId GroupLeaderId MemberId
  ErrorCode           => int16
  GroupGenerationId   => int32
  GroupLeaderId       => String
  MemberId            => String

The error cases in this round of the protocol are similar to those described in the consumer rewrite design: Kafka 0.9 Consumer Rewrite Design.

Phase 2: Synchronizing Group State

Once the group members have been stabilized by the completion of phase 1, the active leader has the option to propagate additional state to the group members. This is used in the new consumer protocol to set partition assignments. Just like the JoinGroup phase, this synchronization acts as a barrier which all members must acknowledge by sending their own SyncGroup requests. The message format is provided below:

Code Block
SyncGroupRequest => GroupId GroupGenerationId MemberId
  GroupId           => String
  

...

GroupGenerationId => 

...

int32
  

...

SyncErrorCode     

...

=> 

...

int16
  

...

GroupState        => [MemberId MemberState]
    MemberId  

...

 

...

  

...

   

...

=> String
  

...

  MemberState     => bytes
 
SyncGroupResponse 

...

=> 

...

ErrorCode 

...

GroupGenerationId 

...

MemberState
  ErrorCode  

...

       => int16
  GroupGenerationId 

...

=> 

...

int32
  MemberState  

...

     

...

=> 

...

bytes

 

The leader sets member states in the GroupState field. For followers, this array must be empty. Once the coordinator has received the group state from the leader, and all members have sent their SyncGroup requests, the coordinator unpacks the group state and responds with each member's respective state in the SyncGroup response. 

There are several failure cases to consider in this phase of the protocol:

  1. The leader can fail prior to performing an expected synchronization. In this case, the session timeout of the leader will expire and the coordinator will mark it dead and force all members to rejoin the group, which will result in new leader being elected. 
  2. The leader's synchronization procedure can fail. For example, if the leader cannot parse one member's metadata, then the state synchronization should generally fail. Since this is most likely a non-recoverable error, there must be some way to propagate the error to all members of the group. The SyncGroup request contains an error field which is used for this purpose. All members will send SyncGroup requests as before, only this time once all members have joined, the error code will be forwarded in the SyncGroup response. The client can then propagate the error to the user.
  3. Followers can fail to join the synchronization barrier. If a follower fails to send an expected SyncGroup, then the member's sessionId will expire which will force members to rejoin.
  4. New members can join during synchronization. If a new member sends a JoinGroup during this phase, then the synchronization round will fail with an error indicating that rejoining the group is necessary.

DescribeGroup: Note that the JoinGroup response in the first phase above doesn't contain a list of the members of the group or their respective metadata. So how is group leader to find the subscriptions in order to make its assignment? For this, we implement a new request type to describe the active group. The leader is responsible for using this to collect the group's metadata before setting the assignment. We could have alternatively sent the member list in the join group response, but this leads to undesirable message overhead when groups begin to get larger.

The request format for DescribeGroup is provided below. Note that this request is also useful for administration purposes since it provides access both to the group's subscription and assignment.

Code Block
DescribeGroupRequest => GroupId
  GroupId => String
 
DescribeGroupResponse => ErrorCode GroupState GroupGenerationId GroupLeaderId GroupMembers
  ErrorCode          => int16
  GroupState

 

JoinGroup Response

The response is similarly modified to remove the fields specific to consumer group management. The coordinator is responsible for analyzing the supported protocols from each group member and choosing one which all members support, which is then transmitted to group members in the join group response. Note that the metadata from each group member for the chosen protocol is returned in the response to all members. This is to allow each member to propagate some local information (such as topic subscriptions) to the entire group. The generation id, as before, is incremented on every successful iteration of the join group protocol.

The basic idea behind the coordinator's protocol selection algorithm is to consider the protocols supported by all members in terms of the preference (as indicated by the position in the list). This means that if all members list protocol "a" before protocol "b," then the coordinator will choose "a." If there is no agreement in terms of preference among the protocols which all members support, then one is chosen randomly.

 

...

         

...

=> 

...

int16 // INACTIVE, JOINING, JOINED, SYNCING, SYNCED
  GroupGenerationId  => int16
  GroupLeaderId      => String
  GroupMembers  

...

     => [MemberId MemberMetadata MemberState]
    MemberId         => String
    MemberMetadata   => bytes
    MemberState      => bytes

A couple additional points are worth noting in this phase:

  • There is no requirement that implementations take advantage of this phase of the protocol. If group membership is all that is needed, then the first phase is all that is required and there is no need.
  • It is possible to resynchronize group state multiple times without going through phase 1. This is useful for the new consumer to support topic metadata changes which affect assignment, but not subscription. For example, if a subscribed topic is resized, then the group leader can set the new assignment immediately for the current group without collecting subscriptions again. The main impact of this is that it is no longer sufficient for the error code to indicate an illegal generation in the heartbeat response (previously this was used by the client to find out when it should rejoin the group). Instead, we must provide error codes indicating whether rejoining the group or resyncing the group is required.  

Consumer Embedded Protocol

Above we outlined the generalized JoinGroup protocol that the consumer will use. Next we show how we will implement consumer semantics on top of this protocol. Other use cases for the join group protocol would be implemented similarly. The two items that must be defined to use the join group protocol are the format of the protocol versions and the format of the protocol metadata.

Code Block
MemberMetadata => Version Subscription AssignmentStrategies
  Version       
bytes

One of the major concerns in this protocol is the size of the join group response. Since each member's metadata is included in the responses for all members, the total amount of data which the coordinator must forward in the join group responses increases quadratically with the size of the group. For example, with a per-member metadata size of 100KB, in a group of 100 members, each join group response would contain 10MB of data, which means that the coordinator would have to transfer 1GB total on every rebalance. It is therefore important to keep the size of the metadata fairly small. Even with smaller metadata size, the group can only grow so large before this becomes a concern again. However, we argue that the protocol is already unsuited to such large groups since it does not have any mechanism to cope with churn. Every time there is a membership change in the group, all members must synchronize to form next generation. If this happens often enough, as is possible with larger groups, then progress is severely restricted.

Consumer Embedded Protocol

Above we outlined the generalized JoinGroup protocol that the consumer would leverage. Next we show how we intend to implement consumer semantics on top of this protocol. Other use cases for the join group protocol would be implemented similarly. The two items that must be defined to use the join group protocol are the format of the protocol versions and the format of the protocol metadata.

Code Block
ProtocolType => "consumer"
 
Protocol => AssignmentStrategy
  AssignmentStrategy => String
 
ProtocolMetadata => Subscription
  Subscription                 => Topics TopicPattern MetadataHash
    Topics                     => [String]
    TopicPattern               => String
    MetadataHash               => bytes

Subscriptions: To support differing subscriptions within the group, each member must include its own subscription in the protocol metadata. These subscriptions are forwarded to all members of the group who can then independently compute their assignment. Subscriptions can be specified either as a list of topics or as a regular expression. The latter can provide a more compact representation when subscribing to a large number of topics (e.g. if using mirror maker to replicate all the topics in a cluster).

Metadata: The metadata hash is included to ensure that each consumer has the same view of the topic metadata. A disagreement could cause an inconsistent assignment, so upon joining the group, each member checks the metadata hash of all other members to make sure they are consistent. It covers the full list of topics in the subscription set and their respective partition counts. If a regex subscription is used, then the hash covers all the topics in the cluster. If there is any disagreement on the number of partitions (e.g. due to stale metadata), then the hashes will compute differently and the consumers will refetch metadata and rejoin the group.

One potential concern in this protocol is whether a sustained disagreement might lead to continual rebalancing. This could be possible if two brokers disagree on the topic metadata for an extended period of time. While metadata should eventually converge, this issue can be resolved by having the consumers fetch their metadata from the coordinator, ensuring that they each see the same view. However, it would still be possible to have metadata disagreement if the metadata itself is changing at a very high rate.

It is worth mentioning that there is a class of assignment strategies which do not depend on consistent metadata among the consumers. For example, in a consistent hashing approach, each partition would be deterministically mapped to one of the group members. Even if two members see a different partition count for a topic, there would be no disagreement over which consumer owns each partition. The tradeoff is generally sub-optimal load balancing of partitions across consumers.

Note that the format of the metadata is an attribute of the assignment strategy. This makes it possible for different strategies to support different metadata formats. For rack-aware assignment, the metadata would also include the rack of each consumer, and the metadata hash would have to cover the leader of each partition since that governs where fetches will be sent to and the whole point of rack-aware assignment is to fetch from brokers on the same rack as the consumer. In general, any information that is used in decision making must somehow be included in the metadata.

Rolling Upgrades (Consumer)

Support for rolling upgrades is enabled through the protocol list in the join group request. For the consumer, this makes it possible to upgrade or change the assignment strategy used by the group without downtime. Note that the protocol does not distinguish between upgrades to the assignment strategy metadata and upgrades to the assignment strategy algorithm. The coordinator just looks for a protocol and version which are supported by all members of the group. It is up to the assignment strategy implementations to implement their own versioning. The following example provides an example of how an upgrade would work in practice.

Code Block
Phase 1: The group consists of consumers A, B, and C each supporting version 0 of the round-robin assignment strategy.
A: (round-robin, 0)
B: (round-robin, 0)
C: (round-robin, 0)
GroupProtocol: (round-robin, 0)
 
Phase 2: A is upgraded. It now supports version 1 of the round-robin strategy in addition to version 0.
A: (round-robin, 1), (round-robin, 0)
B: (round-robin, 0)
C: (round-robin, 0)
GroupProtocol: (round-robin, 0)
 
Phase 3: B is similarly upgraded. The coordinator still chooses (round-robin, 0) since it must choose a version supported by all members.
A: (round-robin, 1), (round-robin, 0)
B: (round-robin, 1), (round-robin, 0)
C: (round-robin, 0)
GroupProtocol: (round-robin, 0)

Phase 4: C is similarly upgraded, which allows the coordinator to update the group's protocol.
A: (round-robin, 1), (round-robin, 0)
B: (round-robin, 1), (round-robin, 0)
C: (round-robin, 1), (round-robin, 0)
GroupProtocol: (round-robin, 1)

This example highlights a few interesting points. First, since the consumers have no awareness of the protocols supported by the other group members, they don't know when the update has completed and continue sending the metadata for both versions. This is not a major problem since the coordinator will only forward the associated metadata from the strategy chosen by the coordinator. Nevertheless, another round of updates would be needed to remove support for the old assignment strategy in the group.

As noted previously, the order of the assignment strategies is significant. The coordinator uses it when selecting between two or more protocols which are supported by all members. For the rolling upgrade case, this just means that the upgraded protocol should be listed first, which guarantees that the coordinator will select it after all members have been updated. 

Note that the same upgrade mechanism can also be used to change to a different assignment strategy.

Scaling Groups

As mentioned previously, the need to propagate the metadata of each member to all other members puts a significant restriction on the amount of metadata that can be used in large groups. For small and medium-sized groups, this might not be a major concern, but assignment strategies must be mindful of the metadata size and set clear scaling expectations. We have considered several approaches to deal with this problem.

Reducing the Join Payload

Dropping Topic Lists: The main contribution to the size of the join group response for the consumer case is the subscription set. One option is to remove the topic list and topic pattern and instead include only a hash of that data. This would make it impossible to simultaneously support differing subscription sets within the group, but it would still be possible to support a changing subscription set in the rolling upgrade scenario, albeit in a weaker form. More concretely, the embedded protocol would look like this: 

Code Block
ProtocolType => "consumer"
 
Protocol => AssignmentStrategy
  AssignmentStrategy => String
 
ProtocolMetadata => SubscriptionHash MetadataHash
  SubscriptionHash => bytes
  MetadataHash     => bytes

Just as before, the consumers would join the group with their respective subscriptions and supported assignment strategies. The coordinator would select an assignment strategy supported by all members and forward the full member metadata to all members of the group. When the members of the group received the join group responses, they would compare the subscription hashes of all the members to find the largest matching sub-group. The members who belonged to this sub-group would assign partitions based on their own subscription sets (which match those of the rest of that sub-group), while non-members would just assign themselves an empty set of partitions and go dormant until the next rebalance. 

For example, suppose that a consumer group consists of members A, B, and C. If A and B are subscribed to (foo), and C is subscribed (foo, bar), then A and B would having matching subscription hashes and would form the largest matching sub-group. Therefore A and B would assign themselves the partitions from (foo), and C would be inactive. If consumer B was then upgraded to the subscription set (foo, bar), then B and C would form the largest sub-group and A would go inactive (obviously there would need to be a tie-breaker in case there is no largest sub-group). The major limitation should also be clear in this example: during a rolling upgrade, the capacity of the cluster will be temporarily halved, which may cause lag in the group. This would have to be considered in capacity planning.

CompressionAn even simpler option is to use compression to reduce the size of the topic subscriptions. Without changing the protocol, members could compress their own metadata which is opaque to the broker anyway. For maximum compression, the protocol could be modified to support compression of the entire group metadata. Since most subscription sets in a group will probably have considerable overlap (even matching exactly in most cases), this should be very effective, though it does come at the cost of some additional complexity for the protocol.

Single Client Assignor

A more dramatic alternative is to change the protocol to allow one member to provide the assignment for the entire group. One option that we have considered to enable this is to allow assignment to piggyback on the join group requests. Here is a basic outline of how this protocol would work: 

  1. As before, all members send their local metadata to the coordinator in a JoinGroupRequest.
  2. The coordinator chooses one member of the group and sends only to it a JoinGroupResponse which contains all members and their respective metadata. All other members continue waiting for their initial JoinGroupRequest to be satisfied.
  3. The chosen member performs the assignment for the entire group and sends it to the coordinator in a new JoinGroupRequest.
  4. The coordinator distributes to each member its respective assignment in a JoinGroupResponse.

Since only one member of the group receives the full group's metadata, the quadratic increase in overall message load becomes linear as the number of group members increases. The assignment that is returned to each member can be much smaller since it only contains the information that that consumer needs (in the case of the consumer, this is just the assigned partitions, similar to the current server-side assignment protocol). It also solves the problem of metadata consistency since only one member of the group does the assignment. Finally, it allows for easier assignment implementations in general since they no longer have to be deterministic. Below we try to make this idea more concrete in order to provide a comparison with the protocol above.

Protocol: Here is a sketch of how the JoinGroup request and response schemas might be changed to support this protocol:

Code Block
JoinGroupRequest => GroupId ProtocolType Phase
  GroupId              => String
  ProtocolType         => String
  Phase                => Join | Synchronized
  Join                 => SessionTimeout MemberId MemberMetadata
    SessionTimeout     => int32
    MemberId           => String
    MemberMetadata     => bytes
  Synchronized         => ErrorCode [MemberId SyncedMemberState]
    ErrorCode          => int16
    MemberId           => String
    SyncedMemberState  => bytes
 
JoinGroupResponse => ErrorCode GroupGenerationId MemberId Phase
  ErrorCode              => int16
  GroupGenerationId      => int32
  MemberId               => String
  Phase                  => Synchronize | Joined
  Synchronize            => [MemberId MemberMetadata]
    MemberId             => String
    MemberMetadata       => bytes
  Joined                 => SyncedMemberState GroupMembers
    SyncedMemberState    => bytes
    GroupMembers         => [MemberId]

The join group request and response objects support an intermediate phase which is used to synchronize the group's state. The coordinator chooses one member, the synchronizer, and sends a response to only its join group request. Other members continue to wait for their initial request to complete. The synchronizer accepts the metadata from each member and determines their respective group state for the generation. For the consumer, the synchronization phase is where assignment is done: the request metadata includes the consumer's subscription and the synchronized state is the assigned partitions for each member (more detail on this below). The "phases" in the join group request and response objects implement a state machine with "join" as the initial state and "joined" as the final state. From the perspective of the client, the valid transitions are:

Code Block
Request(phase:join) -> Response(phase:joined)
Request(phase:join) -> Response(phase:synchronize) -> Request(phase:synchronized) -> Response(phase:joined)

With a single member of the group making decisions for the entire group, it is no longer necessary for the coordinator to do any of that decision making itself. This means in particular that the multiple protocol support provided by the protocol list in the join group request above can instead be pushed into the member metadata which is opaque to the coordinator. In fact, since only one member is responsible for setting the group state, the only requirement for a successful outcome is that the metadata from each member can be decoded by the member selected for synchronization. This makes upgrades easier as long as the metadata is compatible since there's no need for group to actually agree on the assignment strategy to be used. In most cases, such as in the default consumer assignment case, metadata is unlikely to change frequently, but for other cases, a serialization format such as JSON or Avro can be chosen to make schema changes easier.

There are several heuristics that may be useful for choosing the synchronizer for the group. For example, choosing the last member to join the group reduces the chance that a failed member will be chosen. This could have value since the join group protocol can take as long as a full session timeout to complete in the worst case, but since most members would probably join the generation fairly quickly, it's unclear whether this would have much benefit in practice.  Another strategy would be to choose the newest member of the group, since it is the one most likely to have been upgraded, though this couldn't be depended on in practice. An intriguing option would be to expose the metadata version in the join group request so that the coordinator could always choose the member with the largest version. We have chosen to leave this out of the protocol since it adds some complication and forces a versioning strategy on the group which may best be left to the implementation. The simplest solution is to choose a member randomly.

Failure Scenarios: In addition to the failure cases documented for the current protocol (Kafka 0.9 Consumer Rewrite Design#Interestingscenariostoconsider), this protocol must handle failures from the additional synchronization round trip.

  1. Synchronizer Hard Failure: The node selected to perform synchronization may crash before the group's state has been received, so the coordinator must implement a synchronization timeout. When the timeout expires, the coordinator can either select a new synchronizer or fail the generation by sending an error code to each pending join request. The latter is probably preferred for simplicity and because it places a maximum limit on the time to handle join group requests, which allows clients to have their own timeout for joining the group. If the client receives an error from the coordinator indicating a synchronization timeout, it can simply try to rejoin the group, which will cause the coordinator to select a new synchronizer and try again. Note that the synchronization timeout does introduce a race condition if it is close to the time needed by the synchronizer. In the worst case, if the timeout is actually lower than the synchronization time, then the group will never be created. One possible way to address this issue is to add a synchronize_timeout to the join group request in addition to the session_timeout, which would allow the group implementation to indicate how much time the coordinator should wait for group state. This would also make it easy for the client to set its own timeout for receiving join group responses.
  2. Synchronizer Soft Failure: The synchronizer may fail for other reasons, such as the inability to accept the metadata from one of the members in the group. In this case, it will send an error code in its subsequent join group request, which will be propagated by the coordinator to all members of the group. Since rejoining the group will probably not resolve this issue, the client should raise an exception for the user to handle. This is similar to how inconsistent assignment strategy is handled in the existing consumer group protocol.

Consumer Embedded Protocol: The consumer's embedded protocol allows assignment strategy implementations to define their own metadata structure. The only requirement is a version field which implementations can use to verify compatibility among members. For the default assignment strategies, the member metadata corresponds to the member's subscription and the member state corresponds to the partition assignment. 

Code Block
// Basic consumer embedded format
MemberMetadata       => MetadataVersion Metadata
  MetadataVersion    => String
  Metadata           => bytes
 
MemberState          => bytes
 
// For the default assignment strategy implementations shipped with Kafka
MetadataVersion  => "default"
Metadata         => Subscription
  Subscription   => Topics TopicPattern
    Topics       => [String]int16
  Subscription  TopicPattern => String
 
MemberState      => AssignmentString
  Assignment  AssignmentStrategies   => [Topic PartitionsString]
 
MemberState => Version Assignment
 Topic Version       => Stringint16
  Assignment  Partitions   => [int32]

Since the assignor is the only member whose metadata will be depended on for assignment, the issue of metadata inconsistency goes away. It does mean, however, that the assignor may have to fetch additional metadata in order to complete the assignment since it must handle subscriptions from the entire group, which will be a superset of the assignor's own subscription.

Note also that in this protocol, the size of the assignment returned to all consumers depends only on the size of the subscription, not the size of the group. This means that message overhead grows more gracefully with the number of consumers in the group. 

Rolling Upgrades: The single-assignor protocol has no built-in mechanism to deal with metadata upgrades. Instead, this is delegated to the assignor implementation. For example, the assignment strategy can use JSON or Avro as the embedded format in order to ensure compatibility between different versions. In general, this requires both forward and backward compatibility of the metadata format since the coordinator can choose either a new or old version of the assignor in a rolling upgrade scenario. An open question is whether the protocol could be modified to always have the coordinator select the member with the latest metadata version, which would reduce the requirement to backwards compatibility only.

Note, however, that the protocol does handle upgrades not affecting the metadata in a nice way. For example, switching between the range and round-robin assignors does not require any metadata change, so a new version could be deployed simply be changing configuration. There would be no need to have each consumer express support for both assignment strategies in parallel as in the protocol above.

Summary: This protocol skirts two of the major weaknesses of the initial client-side proposal.  Since only one member has to receive all the metadata for the group, the message overhead scales linearly with the number of members in the group. It also eliminates the potential for metadata disagreement between members since agreement is no longer required. The tradeoff is a bit more complexity. In particular, the server must implement a new state for awaiting the assignment from the synchronizer. There is also a little more work on the client, but it seems fairly straightforward since the client doesn't need any additional state tracking and can simply act based on the join group response
Topic Partitions]
    Topic      => String
    Partitions => [int32]

Subscriptions: Group members provide their subscription as a String in the JoinGroup request. This subscription can either be a comma-separated list of topics or a regular expression (TODO: do we need distinguisher field to tell the difference?). 

Assignments: Assignments are provided as an array of topics and partitions in the SyncGroup phase. Each consumer provides a list of the assignment strategies that they support. When the group's leader collects the subscriptions from all members, it must find an assignment strategy which all members support. If none can be found, then the SyncGroup phase will fail with an error as described above.

Briefly, this is how the protocol works for the consumer:

  1. Members JoinGroup with their respective subscriptions.
  2. Once joined, the leader uses DescribeGroup to collect member subscriptions and set the group's assignment in SyncGroup.  
  3. All members SyncGroup to find their assignment.
  4. Once created, there are two cases which can trigger reassignment:
    1. Topic metadata changes which have no impact on subscriptions cause resync. The leader computes the new assignment and initiates SyncGroup.
    2. Membership or subscription changes cause rejoin. 

Rolling Upgrades: To support rolling upgrades without downtime, there are two cases to consider: 

  1. Changes affecting subscription: the protocol directly supports differing subscriptions, so there is no need for special handling. Members will only be assigned partitions compatible with their subscription.
  2. Assignment strategy changes: to support a change to the assignment strategy, new versions must enable support both for the old assignment strategy and the new one. When the group leader collects the metadata for the group, it will only choose an assignment strategy which is compatible with all members. Once all members are updated, the leader will choose strategy in the order listed.

KafkaConsumer API

Although the protocol is slightly more complex for the KafkaConsumer implementation, most of the details are hidden from users. Below we show the basic assignment interface that would be exposed in KafkaConsumer. The partition assigner is generic to allow for custom metadata. For simple versions, the generic type would probably be Void.

Code Block
class ConsumerMetadata<T> {
  String consumerId;
  List<String> subscribedTopics;
  T metadata;
}
 
interface PartitionAssigner<T> extends Configurable {
 
  /**
   * Derive the metadata to be used for the local member by this assigner. This could 
   * come from configuration or it could be derived dynamically (e.g. for host information
   * such as the hostname or number of cpus).
   * @return The metadata
   */
  public T metadata();
  /**
   * Assign partitions for this consumer.
   * @param consumerId The consumer id of this consumer
   * @param partitionsPerTopic The count of partitions for each subscribed topic
   * @param consumers Metadata for consumers in the current generation
   */
  List<TopicPartition>Map<String, List<TopicPartition>> assign(String consumerId,
                                           Map<String, Integer> partitionsPerTopic, 
                                           List<ConsumerMetadata<T>> consumers);
}

TODO:

To support client-side assignment, we'd have to make the following changes:

  1. Migrate existing assignment strategies from the broker to the client. Since the assignment interface is nearly the same, this should be straightforward.
  2. Modify client/server for the new join group protocol. Since we're not really changing the protocol (just the information that is passed through it), this should also be straightforward.
  3. Remove offset validation from the consumer coordinator. Just a couple lines to remove for this.
  4. Add support for assignment versioning (if we decide we need it). Depending on what we do, may or may not be trivial.