You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

The new consumer currently relies on a server-side coordinator to negotiate the set of consumer processes that form the group and to assign the partitions to each member of the consumer group per some assignment strategy which group members must agree on. This provides assurance that the group will always have a consistent assignment and it enables the coordinator to validate that offsets are only committed from consumers that own the respective partition. However, it relies on the server having access to the code implementing the assignment strategy, which is problematic for two reasons:

  1. First is just a matter of convenience. New assignment strategies cannot be deployed to the server without updating configuration and restarting the cluster. It can be a significant operational undertaking just to provide the capability to do this. 
  2. Different assignment strategies have different validation requirements. For example, with a redundant partitioning scheme, a single partition can be assigned to multiple consumers. This limits the ability of the coordinator to validate assignments, which is one of the main reasons for having the coordinator do the assignment in the first place.

If new assignment use cases were rare, this may be a viable solution, but we are already have a number of cases where more control over assignment is needed. For example:

  • Co-partitioning: When joining two topics (in the context of KIP-28), it is necessary to assign the same partitions from more than one topic to the same consumer.
  • Sticky partitioning: For stateful consumers, it is often best to minimize the number of partitions that have to be moved during a rebalance.
  • Redundant partitioning: For some use cases, it is useful to assign each partition to multiple consumers. For e.g search indexers consuming from a Kafka topic need multiple replicas for the same partition. This would mean the same Kafka partition should be assigned to n consumer processes in such a search indexer application.
  • Metadata-based assignment: In some cases, it is convenient to leverage consumer-local metadata to make assignment decisions. For example, if you can derive the rack from the FQDN of the Kafka brokers (which is common), then it would be possible to have rack-aware consumer groups if there was a way to communicate each consumer's rack to the partition assignment.

To address the problems pointed out above and support custom assignment strategies easily, we propose to move the assignment to the client. Specifically, we propose to separate the group management capability provided by the coordinator from partition assignment.  We leave the coordinator to handle the former, while the latter is pushed into the consumer. This promotes separation of concerns and loose coupling.

More concretely, instead of the JoinGroup protocol returning each consumer's assignment directly, we modify the protocol to return the list of members in the group and have each consumer decide its assignment independently. This solves the deployment problem since it is typically an order of magnitude easier to update clients than servers. It also decouples the server from the needs of the assignment strategy, which allow us to support the above use cases without any server changes and provide some "future-proofing" for new use cases. For consumers, the join group protocol becomes more of an abstract group membership capability which, in addition to enabling assignment, can be used as a primitive to build other group management functions (such as leadership). 

There are some disadvantages though. First, since the coordinator does not know the owners of a partition, it can no longer verify that offset commits come from the "right" consumer, which potentially opens the door to inconsistent processing. However, as mentioned above, the ability of the server to validate assignments (and therefore commits) would have to be handicapped anyway to support redundant partitioning. Also, with client-side assignment, debugging assignment bugs requires a little more work. Finding assignment errors may involve aggregating logs from each consumer in the group. In practice, the partitioning strategies used by most users will be simple and tested enough that such errors should be unlikely, but it is still a potential concern.

So far, we made an argument to separate group management from resource assignment. A significant benefit of this proposal is that it enables the group membership protocol to be used for other purposes. Below we outline all the use cases that would now be possible due to group management becoming a generic facility in the Kafka protocol. 

  1. The processor client (KIP-): Depending on the nature of your processing, your processor client might require a different partitioning strategy. For e.g. if your processing requires joins, it needs the co-partitioning assignment strategy for those topics and possibly a simple round robin for other topics. 
  2. Copycat: Here, you have a pool of worker processes in a copycat cluster that act as one large group. If one worker fails, the connector partitions that lived in that process need to be redistributed over the rest of the worker processes. Again, some connectors require a certain assignment strategy while a simple round robin works for others. The problem is the same - group management for a set of processes and assignment of resources amongst them that is really dictated by the application (copycat)  
  3. Single-writer producer: This use case may be a little out there since the transactional producer work hasn't quite shaped up. But the general idea is that you have multiple producers acting as a group, where only one producer is active and writing at any given point of time. If that producer fails, some other producer in the group becomes the single writer.
  4. Consumer: A set of consumer processes need to be part of a group and partitions for the subscribed topics need to be assigned to each consumer processes, as dictated by the consumer application.

Given that there are several non-consumer use cases for a general group management protocol, we propose changing JoinGroupRequest and JoinGroupResponse such that it is not tied to consumer specific concepts.

Below we outline the changes needed to the protocol to make it more general and also the changes to the consumer API to support this.

Protocol

This proposal does not change the basic mechanics of the join group protocol. All members of the group send JoinGroup requests to the coordinator, which waits for all expected members before responding. However, instead of the coordinator returning each consumer's individual assignment, it returns to each member the full list of group members along with their associated metadata. In the case of the new consumer, each member would then compute its assignment independently based on the returned group metadata.

The proposed format of the new JoinGroup messages is given below.

JoinGroup Request

The new join group message is similar to the previous one, but we have dropped the fields specific to consumer partition assignment (e.g. assignment strategy). Instead, all of this information is treated as protocol-specific metadata, which is opaque to the broker. The join group request includes a list of the protocols which the group member supports (sorted by preference). A protocol is used to communicate membership semantics to the members of the group. In the case of the new consumer, it corresponds exactly to the assignment strategy. The coordinator inspects the supported protocols of each member and chooses one that all members support. If no common protocol can be found among members, then the group fails construction. This provides a facility for upgrading to a new version of the protocol in a rolling update. 

Note that each protocol has a field for its own metadata. In the case of the consumer, this allows the assignment strategy to depend on its own format. In the case of the normal round-robin strategy, the metadata would just contain the list of subscribed topics, but other strategies may contain other information (such as the number of cpus on the host).

The GroupType field provides a scope for the protocol. For the consumer, the group type would be "consumer" and the protocols would be "round-robin," "range," etc. Copycat would use "copycat" as the group type and provide its own convention for protocol naming. If group members do not all have the same group type, the coordinator will not allow the group to be created. It's an open question whether this is really necessary since the groupId could embed this information as well.

 

JoinGroupRequest => GroupId GroupType SessionTimeout MemberId SupportedProtocols
  GroupId                 => String
  GroupType               => String
  SessionTimeout          => int32
  MemberId                => String
  GroupProtocols          => [Protocol ProtocolMetadata]
    Protocol              => String
    ProtocolMetadata      => bytes

JoinGroup Response

The response is similarly modified to remove the fields specific to consumer group management. The coordinator is responsible for analyzing the supported protocols from each group member and choosing one which all members support, which is then transmitted to group members in the join group response. Note that the metadata from each group member for the chosen protocol is returned in the response to all members. This is to allow each member to propagate some local information (such as topic subscriptions) to the entire group. The generation id, as before, is incremented on every successful iteration of the join group protocol.

The basic idea behind the coordinator's protocol selection algorithm is to consider the protocols supported by all members in terms of the preference (as indicated by the position in the list). This means that if all members list protocol "a" before protocol "b," then the coordinator will choose "a." If there is no agreement in terms of preference among the supported protocols, then one is chosen randomly.

 

JoinGroupResponse => ErrorCode GroupGenerationId GroupProtocol MemberId GroupMembers
  ErrorCode              => int16
  GroupGenerationId      => int32
  MemberId               => String
  GroupProtocol          => String
  GroupMembers           => [MemberId ProtocolMetadata]
    MemberId             => String
    ProtocolMetadata     => bytes

Consumer Embedded Protocol

Above we outlined the generalized JoinGroup protocol that the consumer would leverage. Next we show how we intend to implement consumer semantics on top of this protocol. Other use cases for the join group protocol would be implemented similarly. The two items that must be defined to use the join group protocol are the format of the protocol versions and the format of the protocol metadata.

GroupType => "consumer"
 
Protocol => AssignmentStrategy
  AssignmentStrategy   => String
 
ProtocolMetadata => Subscription AssignmentStrategyMetadata
  Subscription                 => [Topic NumberPartitions]
    Topic                      => String
    NumberPartitions           => int32
  AssignmentStrategyMetadata   => bytes

A few notes on this message format:

  • We use the assignment strategy as the consumer's group protocol. This basically allows the coordinator to choose the assignment strategy for the group. If we had instead embedded the assignment strategy in the metadata, then the clients would only be able to detect inconsistent strategies after the group was successfully created and all member's metadata was transmitted to the group. 
  • The number of partitions is included in the metadata to ensure that consumers rely on the same metadata in their decision making. The basic idea is to have the assignment algorithm use the largest number of partitions for each topic reported by any of the consumers. If it wasn't provided, then there would be a potential race condition in assignment when partitions for a topic are administratively increased. 
  • Each assignment strategy can implement its own metadata format. To handle upgrades to this format, it is possible either for the assignment strategy to manage the compatibility of its metadata itself (e.g. by using a serialization such as avro), or to create new versions of the strategy using a different name (e.g. "roundrobin-v2").

Open Questions

  1. For client-side assignment in general, the strategies must be deterministic. This is actually not as restrictive as it may seem, since random seed data could be distributed as consumer metadata. Nevertheless, truly non-deterministic assignment strategies are not possible or at least must combine a deterministic aspect to ensure agreement among consumers. For the current known use cases, this doesn't appear to be an issue.
  2. We have chosen to leave metadata versioning to the group protocol implementations. This means that incompatible changes to either the metadata that assignment strategies rely on or to the interpretation of that metadata must result in different protocol names (e.g. "roundrobin-v1" and "roundrobin-v2"). We could instead add a separate field to explicitly declare the metadata version. This would decouple it from the assignment strategy semantics, though it is not clear whether that is desirable. A small advantage might be that it would force implementors to think about versioning the interface.
  3. The consumer's embedded protocol has some redundancy when multiple assignment strategies are supported. In particular, the subscription list will be duplicated for each assignment strategy. We could restructure the protocol to avoid this, but that would require a consumer-specific format, which would introduce another layer of versioning. As it is, versions are only tied to assignment strategies.

KafkaConsumer API

Although the protocol is slightly more complex for the KafkaConsumer implementation, most of the details are hidden from users. Below we show the basic assignment interface that would be exposed in KafkaConsumer. The partition assigner is generic to allow for custom metadata. For simple versions, the generic type would probably be Void.

class ConsumerMetadata<T> {
  String consumerId;
  List<String> subscribedTopics;
  T metadata;
}
 
interface PartitionAssigner<T> extends Configurable {
 
  /**
   * Derive the metadata to be used for the local member by this assigner. This could 
   * come from configuration or it could be derived dynamically (e.g. for host information
   * such as the hostname or number of cpus).
   * @return The metadata
   */
  public T metadata();
  /**
   * Assign partitions for this consumer.
   * @param consumerId The consumer id of this consumer
   * @param partitionsPerTopic The count of partitions for each subscribed topic
   * @param consumers Metadata for consumers in the current generation
   */
  List<TopicPartition> assign(String consumerId,
                              Map<String, Integer> partitionsPerTopic, 
                              List<ConsumerMetadata<T>> consumers, 
}

TODO:

To support client-side assignment, we'd have to make the following changes:

  1. Migrate existing assignment strategies from the broker to the client. Since the assignment interface is nearly the same, this should be straightforward.
  2. Modify client/server for the new join group protocol. Since we're not really changing the protocol (just the information that is passed through it), this should also be straightforward.
  3. Remove offset validation from the consumer coordinator. Just a couple lines to remove for this.
  4. Add support for assignment versioning (if we decide we need it). Depending on what we do, may or may not be trivial.

 

  • No labels