You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 22 Next »

 

The new consumer currently relies on a server-side coordinator to negotiate the set of consumer processes that form the group and to assign the partitions to each member of the consumer group per some assignment strategy which group members must agree on. This provides assurance that the group will always have a consistent assignment and it enables the coordinator to validate that offsets are only committed from consumers that own the respective partition. However, it relies on the server having access to the code implementing the assignment strategy, which is problematic for two reasons:

  1. First is just a matter of convenience. New assignment strategies cannot be deployed to the server without updating configuration and restarting the cluster. It can be a significant operational undertaking just to provide the capability to do this. 
  2. Different assignment strategies have different validation requirements. For example, with a redundant partitioning scheme, a single partition can be assigned to multiple consumers. This limits the ability of the coordinator to validate assignments, which is one of the main reasons for having the coordinator do the assignment in the first place.

If new assignment use cases were rare, this may be a viable solution, but we are already have a number of cases where more control over assignment is needed. For example:

  • Co-partitioning: When joining two topics (in the context of KIP-28), it is necessary to assign the same partitions from more than one topic to the same consumer.
  • Sticky partitioning: For stateful consumers, it is often best to minimize the number of partitions that have to be moved during a rebalance.
  • Redundant partitioning: For some use cases, it is useful to assign each partition to multiple consumers. For e.g search indexers consuming from a Kafka topic need multiple replicas for the same partition. This would mean the same Kafka partition should be assigned to n consumer processes in such a search indexer application.
  • Metadata-based assignment: In some cases, it is convenient to leverage consumer-local metadata to make assignment decisions. For example, if you can derive the rack from the FQDN of the Kafka brokers (which is common), then it would be possible to have rack-aware consumer groups if there was a way to communicate each consumer's rack to the partition assignment.

To address the problems pointed out above and support custom assignment strategies easily, we propose to move the assignment to the client. Specifically, we propose to separate the group management capability provided by the coordinator from partition assignment.  We leave the coordinator to handle the former, while the latter is pushed into the consumer. This promotes separation of concerns and loose coupling.

More concretely, instead of the JoinGroup protocol returning each consumer's assignment directly, we modify the protocol to return the list of members in the group and have each consumer decide its assignment independently. This solves the deployment problem since it is typically an order of magnitude easier to update clients than servers. It also decouples the server from the needs of the assignment strategy, which allow us to support the above use cases without any server changes and provide some "future-proofing" for new use cases. For consumers, the join group protocol becomes more of an abstract group membership capability which, in addition to enabling assignment, can be used as a primitive to build other group management functions (such as leadership). 

There are some disadvantages though. First, since the coordinator does not know the owners of a partition, it can no longer verify that offset commits come from the "right" consumer, which potentially opens the door to inconsistent processing. However, as mentioned above, the ability of the server to validate assignments (and therefore commits) would have to be handicapped anyway to support redundant partitioning. Also, with client-side assignment, debugging assignment bugs requires a little more work. Finding assignment errors may involve aggregating logs from each consumer in the group. In practice, the partitioning strategies used by most users will be simple and tested enough that such errors should be unlikely, but it is still a potential concern.

So far, we made an argument to separate group management from resource assignment. A significant benefit of this proposal is that it enables the group membership protocol to be used for other purposes. Below we outline all the use cases that would now be possible due to group management becoming a generic facility in the Kafka protocol. 

  1. The processor client (KIP-): Depending on the nature of your processing, your processor client might require a different partitioning strategy. For e.g. if your processing requires joins, it needs the co-partitioning assignment strategy for those topics and possibly a simple round robin for other topics. 
  2. Copycat: Here, you have a pool of worker processes in a copycat cluster that act as one large group. If one worker fails, the connector partitions that lived in that process need to be redistributed over the rest of the worker processes. Again, some connectors require a certain assignment strategy while a simple round robin works for others. The problem is the same - group management for a set of processes and assignment of resources amongst them that is really dictated by the application (copycat)  
  3. Single-writer producer: This use case may be a little out there since the transactional producer work hasn't quite shaped up. But the general idea is that you have multiple producers acting as a group, where only one producer is active and writing at any given point of time. If that producer fails, some other producer in the group becomes the single writer.
  4. Consumer: A set of consumer processes need to be part of a group and partitions for the subscribed topics need to be assigned to each consumer processes, as dictated by the consumer application.

Given that there are several non-consumer use cases for a general group management protocol, we propose changing JoinGroupRequest and JoinGroupResponse such that it is not tied to consumer specific concepts.

Below we outline the changes needed to the protocol to make it more general and also the changes to the consumer API to support this.

Protocol

To support client-side assignment, we propose to split the group management protocol into two phases: group membership and state synchronization. The first phase is used to set the active members of the group and to elect a group leader. The second phase is used to enable the group leader to synchronize member state in the group. From the perspective of the consumer, the first phase is used to collect member subscriptions, while the second phase is used to propagate partition assignments. The elected leader in the join group phase is responsible for setting the assignments for the whole group.

Below we describe the phases of this protocol in more detail.

Phase 1: Joining the Group

The purpose of the initial phase is to set the active members of the group. This protocol has basically the same semantics as the initial consumer rewrite design. After finding the coordinator for the group, each member sends a JoinGroup request containing member-specific metadata. The join group request will park at the coordinator until all expected members have sent their own join group requests. Once all have done so, the coordinator randomly selects a leader from the group and sends JoinGroup responses to all the pending requests.

JoinGroupRequest => GroupId SessionTimeout MemberId MemberMetadata
  GroupId             => String
  SessionTimeout      => int32
  MemberId            => String
  MemberMetadata      => bytes
 
JoinGroupResponse => ErrorCode GroupGenerationId GroupLeaderId MemberId
  ErrorCode           => int16
  GroupGenerationId   => int32
  GroupLeaderId       => String
  MemberId            => String

The error cases in this round of the protocol are similar to those described in the consumer rewrite design: Kafka 0.9 Consumer Rewrite Design.

Phase 2: Synchronizing Group State

Once the group members have been stabilized by the completion of phase 1, the active leader has the option to propagate additional state to the group members. This is used in the new consumer protocol to set partition assignments. Just like the JoinGroup phase, this synchronization acts as a barrier which all members must acknowledge by sending their own SyncGroup requests. The message format is provided below:

SyncGroupRequest => GroupId GroupGenerationId MemberId
  GroupId           => String
  GroupGenerationId => int32
  SyncErrorCode     => int16
  GroupState        => [MemberId MemberState]
    MemberId        => String
    MemberState     => bytes
 
SyncGroupResponse => ErrorCode GroupGenerationId MemberState
  ErrorCode         => int16
  GroupGenerationId => int32
  MemberState       => bytes

 

The leader sets member states in the GroupState field. For followers, this array must be empty. Once the coordinator has received the group state from the leader, and all members have sent their SyncGroup requests, the coordinator unpacks the group state and responds with each member's respective state in the SyncGroup response. 

There are several failure cases to consider in this phase of the protocol:

  1. The leader can fail prior to performing an expected synchronization. In this case, the session timeout of the leader will expire and the coordinator will mark it dead and force all members to rejoin the group, which will result in new leader being elected. 
  2. The leader's synchronization procedure can fail. For example, if the leader cannot parse one member's metadata, then the state synchronization should generally fail. Since this is most likely a non-recoverable error, there must be some way to propagate the error to all members of the group. The SyncGroup request contains an error field which is used for this purpose. All members will send SyncGroup requests as before, only this time once all members have joined, the error code will be forwarded in the SyncGroup response. The client can then propagate the error to the user.
  3. Followers can fail to join the synchronization barrier. If a follower fails to send an expected SyncGroup, then the member's sessionId will expire which will force members to rejoin.
  4. New members can join during synchronization. If a new member sends a JoinGroup during this phase, then the synchronization round will fail with an error indicating that rejoining the group is necessary.

DescribeGroup: Note that the JoinGroup response in the first phase above doesn't contain a list of the members of the group or their respective metadata. So how is group leader to find the subscriptions in order to make its assignment? For this, we implement a new request type to describe the active group. The leader is responsible for using this to collect the group's metadata before setting the assignment. We could have alternatively sent the member list in the join group response, but this leads to undesirable message overhead when groups begin to get larger.

The request format for DescribeGroup is provided below. Note that this request is also useful for administration purposes since it provides access both to the group's subscription and assignment.

DescribeGroupRequest => GroupId
  GroupId => String
 
DescribeGroupResponse => ErrorCode GroupState GroupGenerationId GroupLeaderId GroupMembers
  ErrorCode          => int16
  GroupState         => int16 // INACTIVE, JOINING, JOINED, SYNCING, SYNCED
  GroupGenerationId  => int16
  GroupLeaderId      => String
  GroupMembers       => [MemberId MemberMetadata MemberState]
    MemberId         => String
    MemberMetadata   => bytes
    MemberState      => bytes

A couple additional points are worth noting in this phase:

  • There is no requirement that implementations take advantage of this phase of the protocol. If group membership is all that is needed, then the first phase is all that is required and there is no need.
  • It is possible to resynchronize group state multiple times without going through phase 1. This is useful for the new consumer to support topic metadata changes which affect assignment, but not subscription. For example, if a subscribed topic is resized, then the group leader can set the new assignment immediately for the current group without collecting subscriptions again. The main impact of this is that it is no longer sufficient for the error code to indicate an illegal generation in the heartbeat response (previously this was used by the client to find out when it should rejoin the group). Instead, we must provide error codes indicating whether rejoining the group or resyncing the group is required.  

Consumer Embedded Protocol

Above we outlined the generalized JoinGroup protocol that the consumer will use. Next we show how we will implement consumer semantics on top of this protocol. Other use cases for the join group protocol would be implemented similarly.

The two phases of the group protocol correspond to subscription and assignment for the new consumer. Each member of the group submits their subscription as member metadata. The leader of the group collects the subscriptions of the group using DescribeGroup and sends the assignment as member state in SyncGroup. There are several advantages to having a single assignor:

  1. Since the leader makes the assignment for the full group, it is the single source of truth for the metadata used in its decision making. This avoids the need to synchronize metadata among all members that is required in a multi-assignor approach. 
  2. The leader of the group can enforce its own policy for controlling the rate of rebalancing. It doesn't have to rebalance after every metadata change, but can "batch" changes together to reduce the impact of metadata churn.
  3. Once the members of the group are stable, rebalances should only require the SyncGroup round, which requires just a single round trip to the broker for all members.
  4. The leader is the only member that needs to receive the metadata from all members of the group. This reduces the overhead of the protocol.

The two items that must be defined to use the join group protocol is the format of the member metadata provided in JoinGroup, and the member state provided in SyncGroup.

MemberMetadata => Version Subscription AssignmentStrategies
  Version              => int16
  Subscription         => String
  AssignmentStrategies => [String]
 
MemberState => Version Assignment
  Version      => int16
  Assignment   => [Topic Partitions]
    Topic      => String
    Partitions => [int32]

Subscriptions: Group members provide their subscription as a String in the JoinGroup request. This subscription can either be a comma-separated list of topics or a regular expression (TODO: do we need distinguisher field to tell the difference?). 

Assignments: Assignments are provided as an array of topics and partitions in the SyncGroup phase. Each consumer provides a list of the assignment strategies that they support. When the group's leader collects the subscriptions from all members, it must find an assignment strategy which all members support. If none can be found, then the SyncGroup phase will fail with an error as described above.

Briefly, this is how the protocol works for the consumer:

  1. Members JoinGroup with their respective subscriptions.
  2. Once joined, the leader uses DescribeGroup to collect member subscriptions and set the group's assignment in SyncGroup.  
  3. All members SyncGroup to find their assignment.
  4. Once created, there are two cases which can trigger reassignment:
    1. Topic metadata changes which have no impact on subscriptions cause resync. The leader computes the new assignment and initiates SyncGroup.
    2. Membership or subscription changes cause rejoin. 

Rolling Upgrades: To support rolling upgrades without downtime, there are two cases to consider: 

  1. Changes affecting subscription: the protocol directly supports differing subscriptions, so there is no need for special handling. Members will only be assigned partitions compatible with their subscription.
  2. Assignment strategy changes: to support a change to the assignment strategy, new versions must enable support both for the old assignment strategy and the new one. When the group leader collects the metadata for the group, it will only choose an assignment strategy which is compatible with all members. Once all members are updated, the leader will choose strategy in the order listed.

KafkaConsumer API

Although the protocol is slightly more complex for the KafkaConsumer implementation, most of the details are hidden from users. Below we show the basic assignment interface that would be exposed in KafkaConsumer. The partition assigner is generic to allow for custom metadata. For simple versions, the generic type would probably be Void.

class ConsumerMetadata<T> {
  String consumerId;
  List<String> subscribedTopics;
  T metadata;
}
 
interface PartitionAssigner<T> extends Configurable {
 
  /**
   * Derive the metadata to be used for the local member by this assigner. This could 
   * come from configuration or it could be derived dynamically (e.g. for host information
   * such as the hostname or number of cpus).
   * @return The metadata
   */
  public T metadata();
  /**
   * Assign partitions for this consumer.
   * @param consumerId The consumer id of this consumer
   * @param partitionsPerTopic The count of partitions for each subscribed topic
   * @param consumers Metadata for consumers in the current generation
   */
  Map<String, List<TopicPartition>> assign(String consumerId,
                                           Map<String, Integer> partitionsPerTopic, 
                                           List<ConsumerMetadata<T>> consumers);
}

TODO:

To support client-side assignment, we'd have to make the following changes:

  1. Migrate existing assignment strategies from the broker to the client. Since the assignment interface is nearly the same, this should be straightforward.
  2. Modify client/server for the new join group protocol. Since we're not really changing the protocol (just the information that is passed through it), this should also be straightforward.
  3. Remove offset validation from the consumer coordinator. Just a couple lines to remove for this.
  4. Add support for assignment versioning (if we decide we need it). Depending on what we do, may or may not be trivial.

 

  • No labels