Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

Code Block
JoinGroupResponse => ErrorCode GroupGenerationId GroupProtocol MemberId GroupMembers
  ErrorCode              => int16
  GroupGenerationId      => int32
  MemberId               => String
  GroupProtocol          => String
  GroupMembers           => [MemberId ProtocolMetadata]
    MemberId             => String
    ProtocolMetadata     => bytes

Consumer Embedded Protocol

Above we outlined the generalized JoinGroup protocol that the consumer would leverage. Next we show how we intend to implement consumer semantics on top of this protocol. Other use cases for the join group protocol would be implemented similarly. The two items that must be defined to use the join group protocol are the format of the protocol versions and the format of the protocol metadata.

Code Block

It's worth mentioning a couple points on the size of the join group response. Since each member's metadata is included in the responses for all members, the total amount of data which the coordinator must forward in the join group responses increases quadratically with the size of the group. For example, with a per-member metadata size of 100KB, in a group of 100 members, each join group response would contain 10MB of data, which means that the coordinator would have to transfer 1GB total on every rebalance. It is therefore important to keep the size of the metadata fairly small. Even with smaller metadata size, the group can only grow so large before this becomes a concern again. However, we argue that the protocol is already unsuited to such large groups since it does not have any mechanism to cope with churn. Every time there is a membership change in the group, all members must synchronize to form next generation. If this happens often enough, as is possible with larger groups, then progress is severely restricted.

Consumer Embedded Protocol

Above we outlined the generalized JoinGroup protocol that the consumer would leverage. Next we show how we intend to implement consumer semantics on top of this protocol. Other use cases for the join group protocol would be implemented similarly. The two items that must be defined to use the join group protocol are the format of the protocol versions and the format of the protocol metadata.

Code Block
GroupType => "consumer"
 
Protocol => AssignmentStrategy
  AssignmentStrategy   => String
 
ProtocolMetadata => Subscription AssignmentStrategyMetadata
  Subscription                 => Topics TopicRegex MetadataHash
    Topics                     => [Topic NumberPartitions]
      Topic                    => String
      NumberPartitions         => int32
    TopicRegex                 => String
    MetadataHash               => String
  AssignmentStrategyMetadata   => bytes

As mentioned above, we must keep the size of the metadata reasonably small to keep the join group responses from exploding in size. This is challenging for the mirror maker use case where consumers potentially subscribe to all topics in the cluster. To enable that case, we add regex subscriptions directly to the protocol. The behavior for these two cases is described below:

  • Topic subscriptions: When subscribing to topics individually, consumers include the full list of subscribed topics (this could be compressed to help reduce the size of the message). The number of partitions is included to ensure agreement for all consumers. The basic idea is to have the assignment algorithm use the largest number of partitions for each topic reported by any of the consumers. If it wasn't provided, then there would be a potential race condition in assignment when partitions for a topic are administratively increased. 
  • Regex subscriptions: When subscribing to topics using a regular expression, the consumers include the respective regex directly. Instead of sending the metadata for all of the topics matching the regex, however, the consumers send only a hash of the metadata (which should cover all topics). If there is any disagreement on this hash when the join group responses are received, then the consumers must refetch the metadata and rejoin the group.
GroupType => "consumer"   Protocol => AssignmentStrategy AssignmentStrategy => String   ProtocolMetadata => Subscription AssignmentStrategyMetadata Subscription => [Topic NumberPartitions] Topic => String NumberPartitions => int32 AssignmentStrategyMetadata => bytes

A few notes on this message format:

  • We use the assignment strategy as the consumer's group protocol. This basically allows the coordinator to choose the assignment strategy for the group. If we had instead embedded the assignment strategy in the metadata, then the clients would only be able to detect inconsistent strategies after the group was successfully created and all member's metadata was transmitted to the group. The number of partitions is included in the metadata to ensure that consumers rely on the same metadata in their decision making. The basic idea is to have the assignment algorithm use the largest number of partitions for each topic reported by any of the consumers. If it wasn't provided, then there would be a potential race condition in assignment when partitions for a topic are administratively increasedthe assignment strategy as the consumer's group protocol. This basically allows the coordinator to choose the assignment strategy for the group. If we had instead embedded the assignment strategy in the metadata, then the clients would only be able to detect inconsistent strategies after the group was successfully created and all member's metadata was transmitted to the group. 
  • Each assignment strategy can implement its own metadata format. To handle upgrades to this format, it is possible either for the assignment strategy to manage the compatibility of its metadata itself (e.g. by using a serialization such as avro), or to create new versions of the strategy using a different name (e.g. "roundrobin-v2").

Open Questions

  1. For client-side assignment in general, the strategies must be deterministic. This is actually not as restrictive as it may seem, since random seed data could be distributed as consumer metadata. Nevertheless, truly non-deterministic assignment strategies are not possible or at least must combine a deterministic aspect to ensure agreement among consumers. For the current known use cases, this doesn't appear to be an issue.
  2. We have chosen to leave metadata versioning to the group protocol implementations. This means that incompatible changes to either the metadata that assignment strategies rely on or to the interpretation of that metadata must result in different protocol names (e.g. "roundrobin-v1" and "roundrobin-v2"). We could instead add a separate field to explicitly declare the metadata version. This would decouple it from the assignment strategy semantics, though it is not clear whether that is desirable. A small advantage might be that it would force implementors to think about versioning the interface.
  3. The consumer's embedded protocol has some redundancy when multiple assignment strategies are supported. In particular, the subscription list will be duplicated for each assignment strategy. We could restructure the protocol to avoid this, but that would require a consumer-specific format, which would introduce another layer of versioning. As it is, versions are only tied to assignment strategies.

KafkaConsumer API

Although the protocol is slightly more complex for the KafkaConsumer implementation, most of the details are hidden from users. Below we show the basic assignment interface that would be exposed in KafkaConsumer. The partition assigner is generic to allow for custom metadata. For simple versions, the generic type would probably be Void.

Code Block
class ConsumerMetadata<T> {
  String consumerId;
  List<String> subscribedTopics;
  T metadata;
}
 
interface PartitionAssigner<T> extends Configurable {
 
  /**
   * Derive the metadata to be used for the local member by this assigner. This could 
   * come from configuration or it could be derived dynamically (e.g. for host information
   * such as the hostname or number of cpus).
   * @return The metadata
   */
  public T metadata();
  /**
   * Assign partitions for this consumer.
   * @param consumerId The consumer id of this consumer
   * @param partitionsPerTopic The count of partitions for each subscribed topic
   * @param consumers Metadata for consumers in the current generation
   */
  List<TopicPartition> assign(String consumerId,
                              Map<String, Integer> partitionsPerTopic, 
                              List<ConsumerMetadata<T>> consumers, 
}

TODO:

To support client-side assignment, we'd have to make the following changes:

  1. Migrate existing assignment strategies from the broker to the client. Since the assignment interface is nearly the same, this should be straightforward.
  2. Modify client/server for the new join group protocol. Since we're not really changing the protocol (just the information that is passed through it), this should also be straightforward.
  3. Remove offset validation from the consumer coordinator. Just a couple lines to remove for this.
  4. Add support for assignment versioning (if we decide we need it). Depending on what we do, may or may not be trivial.