Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Above we outlined the generalized JoinGroup protocol that the consumer will use. Next we show how we will implement consumer semantics on top of this protocol. Other use cases for the join group protocol would be implemented similarly.

The two phases of the group protocol correspond to subscription and assignment for the new consumer. Each member of the group submits their subscription as member metadata. The leader of the group collects the subscriptions of the group using DescribeGroup and sends the assignment as member state in SyncGroup. There are several advantages to having a single assignor:

  1. Since the leader makes the assignment for the full group, it is the single source of truth for the metadata used in its decision making. This avoids the need to synchronize metadata among all members that is required in a multi-assignor approach. 
  2. The leader of the group can enforce its own policy for controlling the rate of rebalancing. It doesn't have to rebalance after every metadata change, but can "batch" changes together to reduce the impact of metadata churn.
  3. Once the members of the group are stable, rebalances should only require the SyncGroup round, which requires just a single round trip to the broker for all members.
  4. The leader is the only member that needs to receive the metadata from all members of the group. This reduces the overhead of the protocol.

The two items that must be defined to use the join group protocol are is the format of the protocol versions and the format of the protocol metadatamember metadata provided in JoinGroup, and the member state provided in SyncGroup.

Code Block
MemberMetadata => Version Subscription AssignmentStrategies
  Version              => int16
  Subscription         => String
  AssignmentStrategies => [String]
 
MemberState => Version Assignment
  Version      => int16
  Assignment   => [Topic Partitions]
    Topic      => String
    Partitions => [int32]

Subscriptions: Group members provide their subscription as a String in the JoinGroup request. This subscription can either be a comma-separated list of topics or a regular expression (TODO: do we need distinguisher field to tell the difference?). 

Assignments: Assignments are provided as an array of topics and partitions in the SyncGroup phase. Each consumer provides a list of the assignment strategies that they support. When the group's leader collects the subscriptions from all members, it must find an assignment strategy which all members support. If none can be found, then the SyncGroup phase will fail with an error as described above.

Briefly, this is how the protocol works for the consumer:

  1. Members JoinGroup with their respective subscriptions.
  2. Once joined, the leader uses DescribeGroup to collect member subscriptions and set the group's assignment in SyncGroup.  
  3. All members SyncGroup to find their assignment.
  4. Once created, there are two cases which can trigger reassignment:
    1. Topic metadata changes which have no impact on subscriptions cause resync. The leader computes the new assignment and initiates SyncGroup.
    2. Membership or subscription changes cause rejoin. 

Rolling Upgrades: To support rolling upgrades without downtime, there are two cases to consider: 

  1. Changes affecting subscription: the protocol directly supports differing subscriptions, so there is no need for special handling. Members will only be assigned partitions compatible with their subscription.
  2. Assignment strategy changes: to support a change to the assignment strategy, new versions must enable support both for the old assignment strategy and the new one. When the group leader collects the metadata for the group, it will only choose an assignment strategy which is compatible with all members. Once all members are updated, the leader will choose strategy in the order listed.

KafkaConsumer API

Although the protocol is slightly more complex for the KafkaConsumer implementation, most of the details are hidden from users. Below we show the basic assignment interface that would be exposed in KafkaConsumer. The partition assigner is generic to allow for custom metadata. For simple versions, the generic type would probably be Void.

Code Block
class ConsumerMetadata<T> {
  String consumerId;
  List<String> subscribedTopics;
  T metadata;
}
 
interface PartitionAssigner<T> extends Configurable {
 
  /**
   * Derive the metadata to be used for the local member by this assigner. This could 
   * come from configuration or it could be derived dynamically (e.g. for host information
   * such as the hostname or number of cpus).
   * @return The metadata
   */
  public T metadata();
  /**
   * Assign partitions for this consumer.
   * @param consumerId The consumer id of this consumer
   * @param partitionsPerTopic The count of partitions for each subscribed topic
   * @param consumers Metadata for consumers in the current generation
   */
  Map<String, List<TopicPartition>> assign(String consumerId,
                                           Map<String, Integer> partitionsPerTopic, 
                                           List<ConsumerMetadata<T>> consumers);
}

TODO:

To support client-side assignment, we'd have to make the following changes:

  1. Migrate existing assignment strategies from the broker to the client. Since the assignment interface is nearly the same, this should be straightforward.
  2. Modify client/server for the new join group protocol. Since we're not really changing the protocol (just the information that is passed through it), this should also be straightforward.
  3. Remove offset validation from the consumer coordinator. Just a couple lines to remove for this.
  4. Add support for assignment versioning (if we decide we need it). Depending on what we do, may or may not be trivial.