Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

It has been about 8 years since we introduced the so-called new consumer which does group membership and rebalancing through Kafka. Although it was a huge improvement over the old zk-Zookeeper based consumer, it has still been a major pain point from an operation perspective. There are multiple reasons for this, let’s dive into them:

  • The protocol relies on thick clients. Thick clients are annoying in many ways:
    • We have had many bugs in the rebalance protocol over the last years and the majority of them required client side bug fixes. In the cloud area, this is really annoying because we effectively depend on the adoption of the clients in order to fix the issues in production. Unfortunately, the adoption is usually rather slow in the Kafka community;
    • It is almost impossible to debug issues in the protocol without having access to the client logs. In the cloud area, it is a bit annoying to have to request client logs to troubleshoot your system;
    • The clients specify the so-called embedded protocol in the rebalance protocol. While this allows the core protocol to be reused for different purposes, for instance it is used by both the consumer and connect in Apache Kafka, this makes inspecting the state on the broker side hard because the brokers get a bunch of raw bytes. The compatibility of the embedded protocols has also been a challenge; and
    • The clients are responsible for monitoring the metadata and for triggering rebalances. This has caused all sorts of issues in the past because clients of a given group might have a different view of the metadata at a given point in time.
  • The group protocol has been used for general state propagation between members. This is especially the case for power users such as Kafka Streams. While the state propagation is not an issue in itself, the protocol only propagates the state during a rebalance so we have introduced fake or dummy rebalance with the sole goal to propagate some new state to the leader of the group or to all the members of the group. This has been very confusing for our users both on the client side and the broker side. This also makes the interpretation of rebalances through metrics or logs more difficult.protocol relies on a group-wide synchronization barrier. This means that a single misbehaving consumer can take down or disturb the whole group because a rebalance of the whole group is required whenever a consumer joins, leaves or fails. This also limits its scalability as the cost of a rebalance increases with the number of members in the group. Even the cooperative rebalancing protocols depend on the barrier. Specifically, one of the deficiencies of the cooperative protocol is that offsets cannot be committed while the consumer is waiting on the rebalance to complete. So even though a consumer can keep fetching while the rebalance is in progress, it still tends to get stuck behind the barrier.
  • The protocol has gotten too complex over the years. The protocol has gotten too complex over the years. We started with a rather simple protocol and we have extended it a few times over the years. For instance, we introduced KIP-429: Kafka Consumer Incremental Rebalance Protocol, KIP-345: Introduce static membership protocol to reduce consumer rebalances, and a few others. All the incremental changes that we have made have increased the complexity of the protocol.
  • The protocol relies on a group-wide synchronization barrier. This means that a single misbehaving consumer can take down or disturb the whole group because a rebalance of the whole group is required whenever a consumer joins, leaves or fails.
    This also limits its scalability as the cost of a rebalance increases with the number of members in the group. Even the cooperative rebalancing protocols depend on the barrier. Specifically, one of the deficiencies of the cooperative protocol is that offsets cannot be committed while the consumer is waiting on the rebalance to complete. So even though a consumer can keep fetching while the rebalance is in progress, it still tends to get stuck behind the barriergroup protocol has been used for general state propagation between members. This is especially the case for power users such as Kafka Streams. While the state propagation is not an issue in itself, the protocol only propagates the state during a rebalance so we have introduced fake or dummy rebalance with the sole goal to propagate some new state to the leader of the group or to all the members of the group. This has been very confusing for our users both on the client side and the broker side. This also makes the interpretation of rebalances through metrics or logs more difficult.

Design Goals

We propose to introduce a new group membership and rebalance protocol for the Kafka Consumer and, by extensions, Kafka Streams. The proposed protocol is built on top of the following design goals.

...

The desired (or target) assignment is either directly computed by the group coordinator using a server side assignor or computed by one of the group members if a client side assignor is usedspecified. The former is the new default for consumers while the latter allows power users such as Kafka Streams to continue using purpose-built assignors. It is important to note that the entire rebalance process is driven by the group coordinator with this new protocol.

Unlike the current protocol which keeps the heartbeat mechanism as lightweight as possible, the new protocol piggybacks on it to let the group coordinator assign/revoke partitions to/from group members while allowing group members to propagate their current state to the group coordinator. The Heartbeat ConsumerGroupHeartbeat API is introduced for this purpose.

When a client side assignor is used, the group coordinator requests the assignment from one group member by notifying him via the heartbeat protocol. The chosen member uses the PrepareAssignment ConsumerGroupPrepareAssignment API and the InstallAssignment ConsumerGroupInstallAssignment API to respectively get the current state of the group and to install the computed assignment. Thanks to this, the input of the client side assignor is entirely driven by the group coordinator. The consumer is no longer responsible for maintaining any state besides its assigned partitions.

...

Implementing the new rebalance protocol in the current group coordinator is not appropriate in our opinion because its requires many changes anyway in the current protocol to make it interoperable. Therefore, this KIP proposes to rewrite the group coordinator from scratch in Java. The new group coordinator will have a state machine per __consumer_offsets partitions, where each state machine is modelled as an event loop. Those state machines will be executed in N threads.

...

The group coordinator already supports the so called consumer groups. Those groups are groups which implement the “consumer” consumer embedded protocol type. With the introduction of the new consumer rebalance protocol, we need a way to differentiate the existing groups from the new consumer groups. This is important because the existing group relies on a specific set of APIs whereas the new consumer group will use a different set of APIs.

Therefore, we propose to introduce the notion of types within the group coordinator. This will allow us to support different types of groups in the future. We propose to call the current group “generic” generic as it represents a generic implementation of the membership protocol which is specialized by a protocol type and to call the new consumer group “consumer” consumer. Effectively, we would have consumer groups and generic groups which implement a consumer protocolusing the consumer embedded protocol, the old one and the new one proposed in this document.

...

The group and the members represents the current state of a consumer group.

Consumer Group
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Group Epochint32The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group.
Members[]MemberThe set of members in the group.
Partitions Metadata[]PartitionMetadataThe metadata of the partitions that the group is subscribed to. This is used to trigger a rebalance when the detect partition metadata changes.
Member
NameTypeDescription
Member IDstringThe unique identifier of the member. The ID is similar to an incarnation ID. It is generated by the client once and must be used during its lifetime. The ID is similar to an incarnation ID.
Instance IDstringThe instance ID configured by the consumer.
Client IDstringThe client ID configured by the consumer.
Client HoststringThe client ID configured by the consumer.
Subscribed Topic Names[]stringThe current set of subscribed topic names configured by the consumer.
Subscribed Topic RegexstringThe current subscription regular expression configured by the consumer.
Server AssignorstringThe server side assignor used by the group.
Client Assignors[]AssignorThe list of client-side assignors supported by the member. The order of this list defined the priority.
Assignor
NameTypeDescription
NamestringThe unique name of the assignor.
Reasonint8The reason why the metadata was updated.
Minimum Versionint16The minimum version of the metadata schema supported by this assignor.
Maximum Versionint16The maximum version of the metadata schema supported by this assignor.
Versionint16The version used to encode the metadata.
MetadatabytesThe metadata provided by the consumer for this assignor.

...

Target Assignment
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Assignment Epochint32The epoch of the assignment. It represents the epoch of the group used to generate the assignment. It will eventually match the group epoch.
Assignment Error int8The error reported by the assignor.
Members[]MemberThe assignment for each member.
Member
NameTypeDescription
Member IDstringThe unique identifier of the member.
PartitionsPartitions[]TopicIdPartitionThe set of partitions assigned to this member.
MetadatabytesThe metadata assigned to this member.

...

Current Assignment
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Member IDstringThe member ID of this member.
Member Epochint32

The current epoch of this member. The epoch is the assignment epoch of the assignment currently used by this member. This epoch is the one used to fence the member (e.g. offsets commit).

Error int8The error reported by the assignor.
PartitionsPartitions[]TopicIdPartitionThe current partitions used by the member.
Versionint16The version used to encode the metadata.
MetadatabytesThe current metadata used by the member.

...

  • A member joins or leaves the group.
  • A member updates its subscriptions.
  • A member updates its assignors.
  • A member updates its assignors' reason or metadata.
  • A member is fenced or removed from the group by the group coordinator.
  • The partition metadata is updated. For instance when a new partition is added or a new topic matching the subscribed topics is created.

...

  1. The group coordinator revokes the partitions which are no longer in the target assignment of the member. It does so by providing the intersection of the Current Partitions & and the Target Partitions in the heartbeat response until the member acknowledges the revocation in the heartbeat response. The group coordinator will give the rebalance timeout to the member for the revocation process to complete or kick it out from the group otherwise.
  2. When the group coordinator receives the acknowledgement of the revocation, it updates the member current assignment to its target assignment (and target epoch) and durably persist it.
  3. The group coordinator assigns the new partitions to the member. It does so by providing the Target Partitions to the member while ensuring that partitions which are not revoked by other members yet are removed from this set. In other words, new partitions are incrementally assigned to the member when they are revoked by the other members.

...

The new target assignment for the group is basically a function of the current group metadata and the current target assignment. One important aspect to note here is that the assignment is declarative now instead of being incremental like it is in the current implementation. In other words, the assignor defined defines the desired state for the group and let the group coordinator converge to it.

...

The group coordinator has to determine which assignment strategy must be used for the group. The group's members may not have exactly the same assignors at any given point in time - e. They g. they may migrate from an assignor to another one for instance. The group coordinator will chose the assignor to run as follow:

  • The server side assignor is used if any member specified one. If multiple server side assignors are specified in the group, the group coordinator uses the most common one.
  • The client side assignor is used otherwise. The group coordinator will use the assignor which is supported by all the members in the group and, if multiple are, it will respect the precedence defined by the members when they advertise their supposed assignors.

...

The server side assignor is pluggable and the client can choose the one that it wants to use by providing its name in the heartbeat request. If the selected assignor does exist, the group coordinator will reject the heartbeat with an UNSUPPORTED_ASSIGNOR error. The list of supported assignors will be configured in the broker configconfiguration.

We will support two assignors out of the box for Apache Kafka:

...

  • The group coordinator selects a member to run the assignment logic. The selection is explained later in this chapter.
  • The group coordinator notifies the member to compute the new assignment by returning the COMPUTE_ASSIGNMENT error in its next heartbeat response.
  • When the member receives this error, it is expected to call the PrepareAssignment ConsumerGroupPrepareAssignment API to get the current group metadata and the current target assignment.
  • The member computes the new assignment with the relevant assignor.
  • The member calls the InstallAssignment ConsumerGroupInstallAssignment API to install the new assignment. The group coordinator validates it and persists it.

...

The member joins the group by sending an heartbeat with no Member ID and a member epoch equals to 0. He can rejoins the group with a member epoch equals to 0. He can leaves the group by using a member epoch equals to -1.

...

The group coordinator is responsible for storing those group configurations in order to keep their lifecycle tight to their group. When a group is deleted, we want the configuration to be deleted as well. This assumes that IncrementalAlterConfigs and the DescribeConfigs API will be routed to the group coordinator owning the group they are acting upon.

Feature Flag / MetadataVersion / IBP

Regex Based Subscription

The group coordinator is responsible of the regex based subscriptions. We will use Google RE2/J to compile and to execute the regular expressions on the server side. This means that all clients, regardless of their language, will use this common regular expression syntax. This should not be an issue for any common regular expressions but may require changes if specifics from the language are used. 

Feature Flag / MetadataVersion / IBP

We introduce a new feature flag named “group.new.coordinator.enable” instead of relying on the IBP to We introduce a new feature flag named “group.new.coordinator.enable” instead of relying on the IBP to enable the new group coordinator and the consumer group protocol. This flag is particularly useful during the development phase as it allows us to enable the new group coordinator while keeping the other for production deployment. This also allows operators to explicitly enable it in the future.

...

We will introduce a new set of records to persist the new consumer group type in the existing __consumer_offsets topic. The records are detailed in the public interfaces section of this document.

...

Kafka Streams remains a power user of the consumer so it will continue extending the consumer by providing an implementation of the new assignor interface. Streams will also rely on a feature flag to enable the new rebalance protocol.

Member Metadata & Assignment Metadata

Member Metadata refers to the metadata provided by a given member from its assignor. Assignment Metadata refers to the metadata computed by the assignor for the member. The Version, MinimumVersion, MaximumVersion, Reason and Error fields are not now first class citizen in the rebalance protocol so Stream does not have the specify them in the metadata anymore. The schemas for respectively the assignor metadata and the assignment metadata are detailed in the Public Interfaces section.

...

When upgrading from a client side assignor with the old protocol to a client side assignor with the new protocol, the new assignor must be able to understand the metadata serialised by the old assignor. Those metadata will have their version set to -1. This indicates that the assignor must get the version from the first metadata bytes. The new assignor has to serialise the metadata using the same version. The group coordinator will refuse a member using the new protocol to join the group if its client side assignor does not support version -1 but only if the old assignor uses metadata. 

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  // - UNSUPPORTED_ASSIGNOR
  // - COMPUTE_ASSIGNMENT
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", 
      "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in milliseconds." }, 
    { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
	  "about": "null if not provided; the assignment otherwise."
      "fields": [
     	{ "name": "Error", "type": "int8", "versions": "0+",
          "about": "The assigned error." },
        { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+",
          "about": "The assigned topic-partitions to the member.",
          "fields": [
        	{ "name": "TopicId", "type": "uuid", "versions": "0+",
              "about": "The topic ID." },
	        { "name": "Partitions", "type": "[]int32", "versions": "0+",
              "about": "The partitions." }
      	]},
        { "name": "Version", "type": "int16", "versions": "0+",
          "about": "The version of the metadata." },
        { "name": "Metadata", "type": "bytes", "versions": "0+",
          "about": "The assigned metadata." }
	]
  ]
}

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupPrepareAssignmentResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors: 
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "AssignorName", "type": "string", "versions": "0+",
      "about": "The selected assignor." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The member epoch." },
      { "name": "InstanceId", "type": "string", "versions": "0+",
        "about": "The member instance ID." },
      { "name": "SubscriptionsSubscribedTopicIds", "type": "[]uuid", "versions": "0+",
        "about": "The subscribed topic IDs." },
      { "name": "AssignorVersionAssignor", "type": "int16Assignor", "versions": "0+",
        "about": "The versioninformation of the metadata." }, selected assignor",
        "fields": [ 
        { "name": "AssignorReasonVersion", "type": "int8int16", "versions": "0+",
          "about": "The reasonversion of the metadata." },
        { "name": "Reason", "type": "int8", "versions": "0+",
          "about": "The reason of the metadata update." }, 
        { "name": "AssignorMetadataMetadata", "type": "bytes", "versions": "0+",
          "about": "The assignor metadata." }
      ]},
      { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+",
        "about": "The assignedtarget topic-partitions toof the member.",
        "fields": [
          { "name": "TopicId", "type": "uuid", "versions": "0+",
            "about": "The topic ID." },
          { "name": "Partitions", "type": "[]int32", "versions": "0+",
            "about": "The partitions." }
      ]}
    ]},
    { "name": "Topics", "type": "[]TopicMetadata", "versions": "0+",
      "about": "The topic-partition metadata.",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "NumPartitions", "type": "int32", "versions": "0+",
          "about": "The number of partitions." }
    ]}    
  ]
}

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupInstallAssignmentRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member id assigned by the group coordinator." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "Error", "type": "int8", "versions": "0+",
      "about": "The assignment error; or zero if the assignment is successful." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID." },
      { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
        "about": "The assigned topic-partitions to the member.",
        "fields": [
          { "name": "TopicId", "type": "uuid", "versions": "0+",
            "about": "The topic ID." },
          { "name": "Partitions", "type": "[]int32", "versions": "0+",
            "about": "The partitions." }
        ]},
      { "name": "MetadataVersionVersion", "type": "int32", "versions": "0+",
        "about": "The metadata version." }
      { "name": "MetadataBytesMetadata", "type": "bytes", "versions": "0+",
        "about": "The metadata bytes." }
    ]}
  ]
}

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": 71,
  "type": "response",
  "name": "ConsumerGroupDescribeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors: 
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The describe error, or 0 if there was no error." },
        { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
          "about": "The group ID string." },
        { "name": "GroupState", "type": "string", "versions": "0+",
          "about": "The group state string, or the empty string." },
        { "name": "GroupEpoch", "type": "int32", "versions": "0+",
          "about": "The group epoch." },
        { "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
          "about": "The assignment epoch." },
        { "name": "AssignorName", "type": "string", "versions": "0+",
          "about": "The selected assignor." },
        { "name": "Members", "type": "[]Member", "versions": "0+",
          "about": "The members.",
          "fields": [
          { "name": "MemberId", "type": "uuid", "versions": "0+",
            "about": "The member ID." },
          { "name": "InstanceId", "type": "string", "versions": "0+",
            "about": "The member instance ID." },
    	  { "name": "MemberEpoch", "type": "int32", "versions": "0+",
            "about": "The current member epoch." },
          { "name": "ClientId", "type": "string", "versions": "0+",
            "about": "The client ID." },
          { "name": "ClientHost", "type": "string", "versions": "0+",
            "about": "The client host." },
          { "name": "Subscriptions", "type": "[]uuid", "versions": "0+",
            "about": "The subscribed topic IDs." },
          { "name": "Assignment", "type": "Assignment", "versions": "0+",
            "about": "The current assignment.",
            "fields": [
            { "name": "AssignmentPartitionsPartitions", "type": "[]TopicPartition", "versions": "0+",
              "about": "The assigned topic-partitions to the member.",
              "fields": [
                { "name": "TopicId", "type": "uuid", "versions": "0+",
                  "about": "The topic ID." },
                { "name": "Partitions", "type": "[]int32", "versions": "0+",
                  "about": "The partitions." }
              ]},
            { "name": "AssignmentMetadataVersionVersion", "type": "int32", "versions": "0+",
              "about": "The assignor metadata version." }
            { "name": "AssignmentMetadataBytesMetadata", "type": "bytes", "versions": "0+",
              "about": "The assignor metadata bytes." }
          ]},
          { "name": "TargetAssignment", "type": "Assignment", "versions": "0+",
            "about": "The target assignment.",
            "fields": [
            { "name": "AssignmentPartitionsPartitions", "type": "[]TopicPartition", "versions": "0+",
              "about": "The assigned topic-partitions to the member.",
              "fields": [
                { "name": "TopicId", "type": "uuid", "versions": "0+",
                  "about": "The topic ID." },
                { "name": "Partitions", "type": "[]int32", "versions": "0+",
                  "about": "The partitions." }
              ]},
            { "name": "AssignmentMetadataVersionVersion", "type": "int32", "versions": "0+",
              "about": "The assignor metadata version." }
            { "name": "AssignmentMetadataBytesMetadata", "type": "bytes", "versions": "0+",
              "about": "The assignor metadata bytes." }
          ]}
      ]},
      { "name": "AuthorizedOperations", "type": "int32", "versions": "3+", "default": "-2147483648",
        "about": "32-bit bitfield to represent authorized operations for this group." }
    ]}
  ]
}

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": 16,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListGroupsRequest",
  // Version 1 and 2 are the same as version 0.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the StatesFilter field (KIP-518).
  //
  // Version 5 adds the TypesFilter field (KIP-848).
  "validVersions": "0-5",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "StatesFilter", "type": "[]string", "versions": "4+",
      "about": "The states of the groups we want to list. If empty all groups are returned with their state."
    },
    { "name": "TypesFilter", "type": "[]string", "versions": "5+",
      "about": "The types of the groups we want to list. If empty all groups are returned"
    }
  ]
}

Required ACL

  • Read Group

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupTargetAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "AssignmentEpoch", "versions": "0+", "type": "int32" },
        { "name": "Members", "versions": "0+", "type": "[]Member", "fields": [
        	{ "name": "MemberId", "versions": "0+", "type": "string" },
        	            { "name": "TopicPartitionsError", "versions": "0+",
          	  "type": "[]TopicPartitionint8" },
 "fields": [
            	{           { "name": "TopicIdTopicPartitions", "versions": "0+",
          	  "type": "[]TopicPartition", "uuidfields": },[
            	{ "name": "PartitionsTopicId", "versions": "0+", "type": "[]int32uuid" }
        	]},
                    	{ "name": "MetadataErrorPartitions", "versions": "0+", "type": "int8[]int32" }
        	]},
        	{ "name": "MetadataVersionVersion", "versions": "0+", "type": "int16" },
        	{ "name": "MetadataBytesMetadata", "versions": "0+", "type": "bytes" }
        ]
    ]
}

...

The change is mainly backward compatible. The current protocol is still supported without any changes. The consumers and streams should be able to upgrade to the new protocol without any issues. The only part that may require adaptations is the regex based subscriptions. At the moment, those are validated on the client side based and that is language specific. We plan to use https://github.com/google/re2 Google RE2/J on the server side so clients may have to adapt their regex based subscriptions when they migrate to the new protocol. We believe that the transition should be frictionless for most of the users.

...

Rejected Alternatives

An epoch per partition

Our initial idea was to use We started this design by using an epoch per partition instead of using relying on an epoch per member. This does not work due to the custom bytes alongside the assigned partitions. The partitions and the bytes must be treated as a unit.

Use a two group coordinators

Moving a partition from A to B would have require the following step: 1) revoke the partition from A; 2) bump the partition epoch; and 3) assign the partition to B. While this was very appealing at first, it was unpractical in the end for two reasons: 1) migrating from the current protocol is much more difficult without a member id; and 2) the metadata associated to the assignment (e.g. Streams metadata) is not tight to a particular partitions. We ended up using a member epoch with an incremental reassignment algorithm which is pretty close to this.

Not reusing the current coordinator

We considered not reusing the current group coordinator. Instead, the idea was to implement a brand new consumer group coordinator dedicated for the new rebalance protocol. The main benefits of this is that we could have moved away from the __consumer_offsets storage and use something more appropriate, perhaps closer to the KRaft metadata topic. This was rejected because migrating from a generic group to a consumer group would have been We considered using a new dedicated group coordinator to host and manage the new consumer group. We rejected this because this would make the migration from the old protocol to the new protocol much more difficult.

No more client-side assignors, even for Kafka Streams

We considered removing entirely the client-side assignor feature. From a consumer perspective, this is rarely used nowadays. Kafka Streams is the its primary user of this feature. For Kafka Streams, we considered so we thought about using a server - side assignor in this case as well. In the end, we felt like introducing a dependency between Kafka Streams and the broker was not a good idea for the evolution of Kafka Streams because it would prevent users from using new features of Kafka Streams without upgrading their brokers first.We abandoned this for two reasons: 1) the Streams' assignor needs to know the entire Streams's topology so each member would have had to send it out to the server. The topology could be rather big (in MB) so this is not very practical; and 2) That would have introduced a strong dependency between the server version and the Streams version. Using new features in Streams would not be possible without upgrading the servers first. 

Future Work

Eventually, we aim at deprecating the current membership/rebalance API. In order to get to this point, we would need to first move all the use cases away from it.

...