Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add section for better group scaling options

...

 

Code Block
JoinGroupRequest => GroupId SessionTimeout MemberId GroupProtocolType GroupProtocols
  GroupId                 => String
  SessionTimeout          => int32
  MemberId                => String
  GroupProtocolType       => String
  GroupProtocols          => [Protocol ProtocolVersion ProtocolMetadata]
    Protocol              => String
    ProtocolVersion       => String
    ProtocolMetadata      => bytes

 

...

 

Code Block
JoinGroupResponse => ErrorCode GroupGenerationId GroupProtocol MemberId GroupMembers
  ErrorCode              => int16
  GroupGenerationId      => int32
  MemberId               => String
  GroupProtocol          => String
  GroupProtocolVersion   => String
  GroupMembers           => [MemberId ProtocolMetadata]
    MemberId             => String
    ProtocolMetadata     => bytes

One of the major concerns in this protocol is the size of the join group response. Since each member's metadata is included in the responses for all members, the total amount of data which the coordinator must forward in the join group responses increases quadratically with the size of the group. For example, with a per-member metadata size of 100KB, in a group of 100 members, each join group response would contain 10MB of data, which means that the coordinator would have to transfer 1GB total on every rebalance. It is therefore important to keep the size of the metadata fairly small. Even with smaller metadata size, the group can only grow so large before this becomes a concern again. However, we argue that the protocol is already unsuited to such large groups since it does not have any mechanism to cope with churn. Every time there is a membership change in the group, all members must synchronize to form next generation. If this happens often enough, as is possible with larger groups, then progress is severely restricted.

Consumer Embedded Protocol

Above we outlined the generalized JoinGroup protocol that the consumer would leverage. Next we show how we intend to implement consumer semantics on top of this protocol. Other use cases for the join group protocol would be implemented similarly. The two items that must be defined to use the join group protocol are the format of the protocol versions and the format of the protocol metadata.

Code Block
ProtocolType => "consumer"
 
Protocol => AssignmentStrategy
  AssignmentStrategy => String
 
ProtocolMetadata => Subscription
  Subscription                 => Topics TopicPattern MetadataHash
    Topics                     => [String]
    TopicPattern               => String
    MetadataHash               => bytes

Subscriptions: To support differing subscriptions within the group, each member must include its own subscription in the protocol metadata. These subscriptions are forwarded to all members of the group who can then independently compute their assignment. Subscriptions can be specified either as a list of topics or as a regular expression. The latter can provide a more compact representation when subscribing to a large number of topics (e.g. if using mirror maker to replicate all the topics in a cluster).

Metadata: The metadata hash is included to ensure that each consumer has the same view of the topic metadata. A disagreement could cause an inconsistent assignment, so upon joining the group, each member checks the metadata hash of all other members to make sure they are consistent. It covers the full list of topics in the subscription set and their respective partition counts. If a regex subscription is used, then the hash covers all the topics in the cluster. If there is any disagreement on the number of partitions (e.g. due to stale metadata), then the hashes will compute differently and the consumers will refetch metadata and rejoin the group.

One potential concern in this protocol is whether a sustained disagreement might lead to continual rebalancing. This could be possible if two brokers disagree on the topic metadata for an extended period of time. While metadata should eventually converge, this issue can be resolved by having the consumers fetch their metadata from the coordinator, ensuring that they each see the same view. However, it would still be possible to have metadata disagreement if the metadata itself is changing at a very high rate.

It is worth mentioning that there is a class of assignment strategies which do not depend on consistent metadata among the consumers. For example, in a consistent hashing approach, each partition would be deterministically mapped to one of the group members. Even if two members see a different partition count for a topic, there would be no disagreement over which consumer owns each partition. The tradeoff is generally sub-optimal load balancing of partitions across consumers.

Note that the format of the metadata is an attribute of the assignment strategy. This makes it possible for different strategies to support different metadata formats. For rack-aware assignment, the metadata would also include the rack of each consumer, and the metadata hash would have to cover the leader of each partition since that governs where fetches will be sent to and the whole point of rack-aware assignment is to fetch from brokers on the same rack as the consumer. In general, any information that is used in decision making must somehow be included in the metadata.

Open Questions

Scaling Groups

As mentioned previously, the need to propagate the metadata of each member to all other members puts a significant

limit

restriction on the amount of metadata that can be used in large groups. For small and medium-sized groups, this

is probably

might not be a major concern, but assignment strategies must be mindful of the metadata size and set clear scaling expectations

  • For client-side assignment in general, the strategies must be deterministic. This is actually not as restrictive as it may seem, since random seed data could be distributed as consumer metadata. Nevertheless, truly non-deterministic assignment strategies are not possible or at least must combine a deterministic aspect to ensure agreement among consumers. For the current known use cases, this doesn't appear to be an issue.
  • . We have considered several approaches to deal with this problem.

    Reducing the Join Payload

    Dropping Topic Lists: The main contribution to the size of the join group response for the consumer case is the subscription set. One option is to remove the topic list and topic pattern and instead include only a hash of that data. This would make it impossible to simultaneously support differing subscription sets within the group, but it would still be possible to support a changing subscription set in the rolling upgrade scenario, albeit in a weaker form. More concretely, the embedded protocol would look like this: 

    Code Block
    ProtocolType => "consumer"
     
    Protocol => AssignmentStrategy
      AssignmentStrategy => String
     
    ProtocolMetadata => SubscriptionHash MetadataHash
    
      SubscriptionHash => bytes
      MetadataHash     => bytes

    Just as before, the consumers would join the group with their respective subscriptions and supported assignment strategies. The coordinator would select an assignment strategy supported by all members and forward the full member metadata to all members of the group. When the members of the group received the join group responses, they would compare the subscription hashes of all the members to find the largest matching sub-group. The members who belonged to this sub-group would assign partitions based on their own subscription sets (which match those of the rest of that sub-group), while non-members would just assign themselves an empty set of partitions and go dormant until the next rebalance. 

    For example, suppose that a consumer group consists of members A, B, and C. If A and B are subscribed to (foo), and C is subscribed (foo, bar), then A and B would having matching subscription hashes and would form the largest matching sub-group. Therefore A and B would assign themselves the partitions from (foo), and C would be inactive. If consumer B was then upgraded to the subscription set (foo, bar), then B and C would form the largest sub-group and A would go inactive (obviously there would need to be a tie-breaker in case there is no largest sub-group). The major limitation should also be clear in this example: during a rolling upgrade, the capacity of the cluster will be temporarily halved, which may cause lag in the group. This would have to be considered in capacity planning.

    CompressionAn even simpler option is to use compression to reduce the size of the topic subscriptions. Without changing the protocol, members could compress their own metadata which is opaque to the broker anyway. For maximum compression, the protocol could be modified to support compression of the entire group metadata. Since most subscription sets in a group will probably have considerable overlap (even matching exactly in most cases), this should be very effective, though it does come at the cost of some additional complexity for the protocol.

    Single Client Assignor

    A more dramatic alternative is to change the protocol to allow one member to provide the assignment for the entire group. One option that we have considered to enable this is to allow assignment to piggyback on the join group requests. Here is a basic outline of how this protocol would work: 

    1. As before all members send their protocol metadata to the coordinator in a JoinGroupRequest.
    2. The coordinator chooses one member of the group and returns a JoinGroupResponse containing all members and their respective metadata.
    3. The chosen member performs the assignment for the entire group and sends it to the coordinator in a new JoinGroupRequest.
    4. The coordinator distributes to each member its respective assignment in a JoinGroupResponse.

    Advantages: Since only one member of the group receives the full group's metadata, the quadratic increase in overall message load becomes linear as the number of group members increases. The assignment that is returned to each member can be much smaller since it only contains the information that that consumer needs (in the case of the consumer, this is just the assigned partitions, similar to the current server-side assignment protocol). It also solves the problem of metadata consistency since only one member of the group does the assignment. Finally, it allows for easier assignment implementations in general since they no longer have to be deterministic.

    Disadvantages: Obviously the tradeoff is complexity. In particular, the coordinator must implement a new state in which it is awaiting assignment from the selected assignor. It has to handle the case that the assignor fails, which it might do by selecting another assignor or just failing the generation and having the group members retry. It's also possible that a new group member might arrive while assignment is active, or one might fail. For the client, on the other hand, the protocol does not seem significantly more complex: it just parses the join group response and responds based on whether an assignment is needed or provided.

    Here is a sketch of how the JoinGroup request and response schemas might be changed to support this protocol:

    Code Block
    JoinGroupRequest => GroupId ProtocolType Phase
      GroupId              => String
      ProtocolType         => String
      Phase                => Join | Assign
      Join                 => SessionTimeout MemberId MemberMetadata
        SessionTimeout     => int32
        MemberId           => String
        GroupProtocols     => [Protocol ProtocolVersion ProtocolMetadata]
          Protocol         => String
          ProtocolVersion  => String
          ProtocolMetadata => bytes
      Assign               => [MemberId MemberAssignment]
        MemberId           => String
        MemberAssignment   => bytes
     
    JoinGroupResponse => ErrorCode GroupProtocol GroupProtocolVersion Phase
      ErrorCode              => int16
      GroupProtocol          => String
      GroupProtocolVersion   => String
      Phase                  => Assign | Complete
      Assign                 => [MemberId MemberMetadata]
        MemberId             => String
        MemberMetadata       => bytes
      Complete               => GroupGenerationId MemberId MemberAssignment     
        GroupGenerationId    => int32
        MemberId             => String
        MemberAssignment     => bytes
    The consumer's embedded protocol has some redundancy when multiple assignment strategies are supported. In particular, the subscription list will be duplicated for each assignment strategy. We could restructure the protocol to avoid this, but that would require a consumer-specific format, which would introduce another layer of versioning. As it is, versions are only tied to assignment strategies.

    KafkaConsumer API

    Although the protocol is slightly more complex for the KafkaConsumer implementation, most of the details are hidden from users. Below we show the basic assignment interface that would be exposed in KafkaConsumer. The partition assigner is generic to allow for custom metadata. For simple versions, the generic type would probably be Void.

    Code Block
    class ConsumerMetadata<T> {
      String consumerId;
      List<String> subscribedTopics;
      T metadata;
    }
     
    interface PartitionAssigner<T> extends Configurable {
     
      /**
       * Derive the metadata to be used for the local member by this assigner. This could 
       * come from configuration or it could be derived dynamically (e.g. for host information
       * such as the hostname or number of cpus).
       * @return The metadata
       */
      public T metadata();
      /**
       * Assign partitions for this consumer.
       * @param consumerId The consumer id of this consumer
       * @param partitionsPerTopic The count of partitions for each subscribed topic
       * @param consumers Metadata for consumers in the current generation
       */
      List<TopicPartition> assign(String consumerId,
                                  Map<String, Integer> partitionsPerTopic, 
                                  List<ConsumerMetadata<T>> consumers);
    }

    TODO:

    To support client-side assignment, we'd have to make the following changes:

    1. Migrate existing assignment strategies from the broker to the client. Since the assignment interface is nearly the same, this should be straightforward.
    2. Modify client/server for the new join group protocol. Since we're not really changing the protocol (just the information that is passed through it), this should also be straightforward.
    3. Remove offset validation from the consumer coordinator. Just a couple lines to remove for this.
    4. Add support for assignment versioning (if we decide we need it). Depending on what we do, may or may not be trivial.