Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Consumer Group
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Group Epochint32The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group.
Members[]MemberThe set of members in the group.
Partitions Metadata[]PartitionMetadataThe metadata of the partitions that the group is subscribed to. This is used to detect partition metadata changes.
Member
NameTypeDescription
Member IDstringThe unique identifier of the member. It is generated by the client once and must be used during its lifetime. The ID is similar to an incarnation ID.
Instance IDstringThe instance ID configured by the consumer.
Client IDstringThe client ID configured by the consumer.
Client HoststringThe client host configured by the consumer.
Subscribed Topic NamesIds[]stringuuidThe current set of subscribed topic names ids configured by the consumer.
Subscribed Topic RegexstringThe current subscription regular expression configured by the consumer.
Server AssignorstringThe server side assignor used by the group.
Client Assignors[]AssignorThe list of client-side assignors supported by the member. The order of this list defined the priority.
Assignor
NameTypeDescription
NamestringThe unique name of the assignor.
Reasonint8The reason why the metadata was updated.
Minimum Versionint16The minimum version of the metadata schema supported by this assignor.
Maximum Versionint16The maximum version of the metadata schema supported by this assignor.
Versionint16The version used to encode the metadata.
MetadatabytesThe metadata provided by the consumer for this assignor.

...

  • The new protocol does not really fit in the current interface and its semantic is different; and
  • It seems preferable to let us evolve the current protocol independently if the need arises.

Deprecate Enforcing Rebalances

Consumer#enforceRebalance will be deprecated and will throw an IllegalStateException if used when the new protocol is enabled. Enforcing a rebalance with the new protocol does not make any sense. Instead, power users will have the ability to trigger a reassignment by either providing a non-zero reason or by updating the assignor metadata.

Streams

Kafka Streams remains a power user of the consumer so it will continue extending the consumer by providing an implementation of the new assignor interface. Streams will also rely on a feature flag to enable the new rebalance protocol.

Member Metadata & Assignment Metadata

Member Metadata refers to the metadata provided by a given member from its assignor. Assignment Metadata refers to the metadata computed by the assignor for the member. The Version, MinimumVersion, MaximumVersion, Reason and Error fields are now first class citizen in the rebalance protocol so Stream does not have the specify them in the metadata anymore. The schemas for respectively the assignor metadata and the assignment metadata are detailed in the Public Interfaces section.

Note that Streams may take this opportunity to do further changes to its metadata. We may extend this KIP or do a follow-up KIP in the future for this.

Assignor Behavior

The assignor behavior remains similar to the existing assignor. The major difference is that the assignor must serialize the assignment metadata of each member with the correct version used by the member. Another difference is that the new assignor must be able to handle the old metadata format as well during the upgrade from the old to the new protocol. This upgrade path is detailed in the upgrade section of this document.

Member Behavior

Upon receiving the assignment, each member would respectively create, close, or recycle tasks as indicated and update the global assignment information, like today. We explained earlier that partitions are incrementally assigned to the member when they are revoked by the others. This means that the assignment metadata may already reference partitions which are not assigned to the member yet. The Streams assignor must consider the assigned partitions as the source of truth in this case.

Each member encodes the lag of its standby tasks in its metadata. We can not update the lag in every heartbeat request because that would constantly trigger reassignment in the group. Instead, when a) the task lag has been reduced within the acceptable.recovery.lag threshold or b) the task lag is consistently increasing for some time, the member should consider triggering a rebalance by sending its next heartbeat with the appropriate encoded reason and the updated task lags. 

Supporting Online Consumer Group Upgrade

Upgrading to the new protocol or downgrading from it is possible by rolling the consumers, assuming that the new protocol is enabled on the server side, with the correct group.protocol. When the first consumer using the new rebalance protocol joins the group, the group is converted from a generic group to a consumer group. When the last consumer using the new rebalance protocol leaves the group, the group is converted back to a generic group. Note that the group epoch starts at the current group generation. During the migration, all the JoinGroup, SyncGroup and Heartbeat calls from the non-upgraded consumers are translated to the new protocol. How? The idea is to basically reconcile each member individually with the old protocol APIs. 

Before explaining how that will work, let's recapitulate how the current protocol workflow. First, the members join or re-join the group with the JoinGroup API. The JoinGroup request contains the subscriptions, the owned partitions, etc. When all the members are have joined, the group coordinator picks a leader for the group and sends back the JoinGroup response to all the members. The leader is responsible for computing the assignment. Second, all the members collects their assignment - computed by the leader - by using the SyncGroup API. In parallel, the members heartbeat with the Heartbeat API in order to maintain their session. The Heartbeat API is also used by the group coordinator to inform the members about an ongoing rebalance. All those interactions are synchronized on the generation of the group. It is important to note that the consumer does not make any assumption about the generation id. It basically uses what it receives from the group coordinator. At least, this is how the Java client works. The current rebalance protocol supports two modes: Eager and Cooperative. In the eager mode, the consumer revokes all its partitions before rejoining the group during a rebalance. In the cooperative mode, the consumer does not revoke any partitions before rejoining the group. However, it revokes the partitions that it does not own anymore when it receives it new assignment and rejoins immediately if he had to revoke any partitions.

The new protocol relies on the ConsumerGroupHeartbeat API to do all the above. Concretely, the API updates the group state, provides the partitions owned by the consumer, gets back the assignment, and updates the session. We can remap those to the old protocol as follow: the JoinGroup API updates the group state and provides the partitions owned, the SyncGroup API gets back the assignment, and the Heartbeat API updates the session. The main difference here is that the JoinGroup and SyncGroup does not run continuously. The group coordinator has to trigger it when it is need by returning the REBALANCE_IN_PROGRESS error in the heartbeat response.

The idea is to manage each members individually while relying on the new engine to do the synchronization between them. Each member will use rebalance loops to update the group coordinator and collect their assignment. The group coordinator will ensure that a rebalance is triggered when one needs to update his assignments. This process is detailed in the next sub-chapters:

Member States

Let's start by defining a state machine for each member:

  • PrepareRebalance - The member enters this state when a rebalance is required. The member must rejoin within the rebalance timeout or the coordinator kicks it out from the group. We will discuss later when a rebalance is required.
  • CompletingRebalance - The member enters this state when the join group has been received. The member must sync within the rebalance timeout or the coordinator kicks it out from the group.
  • Stable - The member enters this state when it has completed the rebalance loop.

We use the terminology already use in the current protocol intentionally here in order to facilitate the reasoning.

JoinGroup Handling

The JoinGroup API is handled as follow:

  • The basic validations are applied. The request is rejected with the appropriate error if the validation fails.
    • The group must exist.
    • The member must exist or be created.
    • The embedded protocol must be on version >= 3.
    • The generation id must match the current member epoch. The generation id is provided in the embedded protocol.
  • The group state is updated if needed.
    • The group epoch is incremented if a new assignment is required.
    • The Topics provided in the embedded consumer protocol are used to update the SubscribedTopicNames.
    • The Protocols are converted to Assignors where the UserData become the metadata and the version is set to -1.
  • The current member assignment is updated if needed:
    • If the member has revoked all its partitions or the required partitions, the member can transition to its next epoch. The target assignment become the current assignment. The group coordinator replies with the new member epoch as the generation id.
    • If the member has to revoke partitions, the group coordinator replies with the current member epoch as the generation id.
  • The member transitions to CompletingRebalance state.

SyncGroup Handling

Topic ID and Topic Recreation

With KIP-516, the Consumer already uses topic Ids on the fetch patch and on the metadata path. However, it does not use them for the assignment and the committed offsets. This KIP closes that gap. The new consumer rebalance protocol works only with topic ids and the OffsetFetch and OffsetCommit APIs are updated to use topic ids as well. This strengthen the semantic of the Consumer. The consumer will be able to correctly handle topic recreation. When a topic is deleted, the Consumer will re-resolve the name to get the new topic id. Then, it updates it subscriptions with the new topic id. The rebalance protocol will resign the old topic id and assign the new one to members so members will consider the topic as a new/different topic. Adding the topic id to the OffsetFetch and OffsetCommit ensures that the Consumer won't use the old committed offsets with the new topic as well. The processing of the new topic with start fresh based on the offset reset policy.

Note that we don't plan to update the Consumer's public API to expose topic ids at this stage so end users won't be able to detect the recreations yet. We might consider doing this in a future KIP.

Deprecate Enforcing Rebalances

Consumer#enforceRebalance will be deprecated and will throw an IllegalStateException if used when the new protocol is enabled. Enforcing a rebalance with the new protocol does not make any sense. Instead, power users will have the ability to trigger a reassignment by either providing a non-zero reason or by updating the assignor metadata.

Streams

Kafka Streams remains a power user of the consumer so it will continue extending the consumer by providing an implementation of the new assignor interface. Streams will also rely on a feature flag to enable the new rebalance protocol.

Member Metadata & Assignment Metadata

Member Metadata refers to the metadata provided by a given member from its assignor. Assignment Metadata refers to the metadata computed by the assignor for the member. The Version, MinimumVersion, MaximumVersion, Reason and Error fields are now first class citizen in the rebalance protocol so Stream does not have the specify them in the metadata anymore. The schemas for respectively the assignor metadata and the assignment metadata are detailed in the Public Interfaces section.

Note that Streams may take this opportunity to do further changes to its metadata. We may extend this KIP or do a follow-up KIP in the future for this.

Assignor Behavior

The assignor behavior remains similar to the existing assignor. The major difference is that the assignor must serialize the assignment metadata of each member with the correct version used by the member. Another difference is that the new assignor must be able to handle the old metadata format as well during the upgrade from the old to the new protocol. This upgrade path is detailed in the upgrade section of this document.

Member Behavior

Upon receiving the assignment, each member would respectively create, close, or recycle tasks as indicated and update the global assignment information, like today. We explained earlier that partitions are incrementally assigned to the member when they are revoked by the others. This means that the assignment metadata may already reference partitions which are not assigned to the member yet. The Streams assignor must consider the assigned partitions as the source of truth in this case.

Each member encodes the lag of its standby tasks in its metadata. We can not update the lag in every heartbeat request because that would constantly trigger reassignment in the group. Instead, when a) the task lag has been reduced within the acceptable.recovery.lag threshold or b) the task lag is consistently increasing for some time, the member should consider triggering a rebalance by sending its next heartbeat with the appropriate encoded reason and the updated task lags. 

Supporting Online Consumer Group Upgrade

Upgrading to the new protocol or downgrading from it is possible by rolling the consumers, assuming that the new protocol is enabled on the server side, with the correct group.protocol. When the first consumer using the new rebalance protocol joins the group, the group is converted from a generic group to a consumer group. When the last consumer using the new rebalance protocol leaves the group, the group is converted back to a generic group. Note that the group epoch starts at the current group generation. During the migration, all the JoinGroup, SyncGroup and Heartbeat calls from the non-upgraded consumers are translated to the new protocol. How? The idea is to basically reconcile each member individually with the old protocol APIs. 

Before explaining how that will work, let's recapitulate how the current protocol workflow. First, the members join or re-join the group with the JoinGroup API. The JoinGroup request contains the subscriptions, the owned partitions, etc. When all the members are have joined, the group coordinator picks a leader for the group and sends back the JoinGroup response to all the members. The leader is responsible for computing the assignment. Second, all the members collects their assignment - computed by the leader - by using the SyncGroup API. In parallel, the members heartbeat with the Heartbeat API in order to maintain their session. The Heartbeat API is also used by the group coordinator to inform the members about an ongoing rebalance. All those interactions are synchronized on the generation of the group. It is important to note that the consumer does not make any assumption about the generation id. It basically uses what it receives from the group coordinator. At least, this is how the Java client works. The current rebalance protocol supports two modes: Eager and Cooperative. In the eager mode, the consumer revokes all its partitions before rejoining the group during a rebalance. In the cooperative mode, the consumer does not revoke any partitions before rejoining the group. However, it revokes the partitions that it does not own anymore when it receives it new assignment and rejoins immediately if he had to revoke any partitions.

The new protocol relies on the ConsumerGroupHeartbeat API to do all the above. Concretely, the API updates the group state, provides the partitions owned by the consumer, gets back the assignment, and updates the session. We can remap those to the old protocol as follow: the JoinGroup API updates the group state and provides the partitions owned, the SyncGroup API gets back the assignment, and the Heartbeat API updates the session. The main difference here is that the JoinGroup and SyncGroup does not run continuously. The group coordinator has to trigger it when it is need by returning the REBALANCE_IN_PROGRESS error in the heartbeat response.

The idea is to manage each members individually while relying on the new engine to do the synchronization between them. Each member will use rebalance loops to update the group coordinator and collect their assignment. The group coordinator will ensure that a rebalance is triggered when one needs to update his assignments. This process is detailed in the next sub-chapters:

Member States

Let's start by defining a state machine for each member:

  • PrepareRebalance - The member enters this state when a rebalance is required. The member must rejoin within the rebalance timeout or the coordinator kicks it out from the group. We will discuss later when a rebalance is required.
  • CompletingRebalance - The member enters this state when the join group has been received. The member must sync within the rebalance timeout or the coordinator kicks it out from the group.
  • Stable - The member enters this state when it has completed the rebalance loop.

We use the terminology already use in the current protocol intentionally here in order to facilitate the reasoning.

JoinGroup Handling

The JoinGroup The SyncGroup API is handled as follow:

  • The basic validations are applied. The request is rejected with the appropriate error if the validation fails.
    • The group must exist.
    • The member must exist or be created.
    • The embedded protocol must be on version >= 3.
    • The generation id must match the current member epoch. The generation id is provided in the embedded protocol.
  • The group state is updated if needed.
    • The group epoch is incremented if a new assignment is required.
    • The Topics provided in the embedded consumer protocol are used to update the SubscribedTopicNames.
    • The Protocols are converted to Assignors where the UserData become the metadata and the version is set to -1.
  • The current member assignment is updated if needed:
    • If the member has revoked all its partitions or the required partitions, the member can transition to its next epoch. The target assignment become the current assignment. The group coordinator replies with the new member epoch as the generation id.
    • If the member has to revoke partitions, the group coordinator replies with the current member epoch as the generation id.
  • The member transitions to CompletingRebalance state.

SyncGroup Handling

The SyncGroup API is handled as follow:

  • The basic validations are applied.
    • The group must exist.
    • The member must exist.
    • The generation id must match the current member epoch. The generation id is provided in the embedded protocol.
  • The group coordinator replies with the current assignment. There are two cases to consider here:
    • If the member epoch is smaller than the target epoch, the coordinator replies with the intersection between the current assignment and target assignment. This is used to revoke the partitions not owned anymore. In this case, we know that the member will rejoins the group
    coordinator replies with the current assignment. There are two cases to consider here:
    • If the member epoch is smaller than the target epoch, the coordinator replies with the intersection between the current assignment and target assignment. This is used to revoke the partitions not owned anymore. In this case, we know that the member will rejoins the group if it revokes partitions. That will automatically trigger another rebalance to collect the newly assigned partitions.
    • If the member epoch is equals to the target epoch, the coordinator replies with the current assignment - the partitions not revoked by other members yet.
  • The group coordinator serializes the assignment with the embedded consumer protocol.
  • The member transitions to Stable state.

...

The version of the API is bumped to 9 to add support for topic ids. The request can either use topic ids or topic names. The consumer will only use topic ids when they are available whereas the admin client will continue to use topic names as per its API.

Request Schema

We propose to rename GenerationId to GenerationIdOrMemberEpoch.

Required ACL

  • Read Group

Request Handling

When the group id corresponds to a consumer group using the new rebalance protocol, the provided member epoch must match the expected member epoch.

Response Schema

The response can return FENCED_MEMBER_EPOCH.

OffsetFetch API

The version of the API is bumped to 9.

Request Schema

Code Block
Code Block
languagejs
linenumberstrue
{
  "apiKey": 98,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "OffsetFetchRequestOffsetCommitRequest",
  // In version 0, the request read offsets from ZK Version 1 adds timestamp and group membership information, as well as the commit timestamp.
  //
  // Version Starting2 inadds versionretention 1,time. the brokerIt supportsremoves fetchingthe offsetscommit fromtimestamp theadded internal __consumer_offsets topicin version 1.
  //
  // Version Starting3 inand version4 2,are the requestsame canas containversion a2. null
 topics array to indicate that offsets //
  // forVersion all5 topicsremoves shouldthe beretention fetched.time, Itwhich alsois returnsnow acontrolled toponly levelby errora code
  // for group or coordinator level errorsbroker configuration.
  //
  // Version 3, 4, and 5 are6 adds the sameleader asepoch versionfor 2fencing.
  //
  // Versionversion 67 isadds thea firstnew flexiblefield version.
called groupInstanceId //
to indicate //member Versionidentity 7 is adding the require stable flagacross restarts.
  //
  // Version 8 is addingthe supportfirst for fetching offsets for multiple groups at a timeflexible version.
  //
  // Version 9 adds GenerationIdOrMemberEpoch and MemberId fieldsTopicId field (KIP-848).
  "validVersions": "0-9",
  "flexibleVersions": "68+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0-7+", "entityType": "groupId",
      "about": "The unique group to fetch offsets foridentifier." },
    // NewRenamed fieldsfield.
    { "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "91+", "default": "-1", "ignorable": true,
      "about": "The generation of the group if the generic group protocol or the member epoch if the consumer protocol." },
    { "name": "MemberId", "type": "string", "versions": "91+", "ignorable": true,
      "about": "The member ID assigned by the group coordinator." },
    // End of new fields.
    { "name": "TopicsGroupInstanceId", "type": "[]OffsetFetchRequestTopicstring", "versions": "0-7+",
      "nullableVersions": "2-77+", "default": "null",
      "about": "EachThe topicunique weidentifier wouldof likethe toconsumer fetchinstance offsetsprovided for,by or null to fetch offsets for all topics.", "fields": [
  end user." },
    { "name": "NameRetentionTimeMs", "type": "stringint64", "versions": "02-74", "entityTypedefault": "topicName-1",
 "ignorable": true,
      "about": "The topic name." time period in ms to retain the offset." },
      { "name": "PartitionIndexesTopics", "type": "[]int32OffsetCommitRequestTopic", "versions": "0-7+",
        "about": "The partition indexes we would liketopics to fetchcommit offsets for.", "fields": }[
      // Updated ]},field.
      { "name": "GroupsName", "type": "[]OffsetFetchRequestGroupstring", "versions": "80+",
 "nullableVersions": "9+", "default": "null",  "aboutentityType": "EachtopicName",
 group we would like to fetch offsets for",about": "fields": [The topic name."},
      // New field.
      { "name": "groupIdTopicId", "type": "stringuuid", "versions": "8+", "entityType": "groupId",
       9+", "about": "The unique grouptopic ID." },
      { "name": "TopicsPartitions", "type": "[]OffsetFetchRequestTopicsOffsetCommitRequestPartition", "versions": "8+", "nullableVersions": "80+",
        "about": "Each topic we would likepartition to fetchcommit offsets for, or null to fetch offsets for all topics.", "fields": [
        { "name": "NamePartitionIndex", "type": "stringint32", "versions": "8+", "entityType": "topicName0+",
          "about": "The topicpartition nameindex." },
        { "name": "PartitionIndexesCommittedOffset", "type": "[]int32int64", "versions": "80+",
          "about": "The partition indexes we would likemessage offset to fetchbe offsets forcommitted." },
      ]}
    ]},
    {{ "name": "RequireStableCommittedLeaderEpoch", "type": "boolint32", "versions": "76+", "default": "false"-1", "ignorable": true,
          "about": "WhetherThe brokerleader shouldepoch hold on returning unstable offsets but set a retriable error code for the partitions."}
  ]
}

Required ACL

  • Describe Group

Request Handling

The MemberId and the GenerationIdOrMemberEpoch are verified. FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID is returned.

Response Schema

The response is the same. Only new FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID errors could be returned.

Response Handling

Upon receiving the FENCED_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.

DescribeConfigs API

Request Schema

The schema is the same but the ResourceType field can be set to GROUP (16).

Required ACL

  • Describe Config on the group.

Response Schema

No changes.

AlterIncrementalConfigs API

The API is the same but supports a new resource type: GROUP (16). When GROUP is used, the resource name corresponds to the group id.

Request Schema

The schema is the same but the ResourceType field can be set to GROUP (16).

Required ACL

  • Alter Config on the group.

Response Schema

No changes.

Records

This section describes the new record types required for the new protocol. The storage layout is based on the data model described earlier in this document.

As explained earlier, they will be persisted in the __consumer_offsets compacted topic. The compacted topic based storage requires a dedicated key type per record type in order for the compaction to work. The current protocol already uses versions from 0 to 2 (included) for the keys.

Group Metadata

Groups can be rather large so we propose to use several records to store a group in order to not be limited by the maximum batch size (1MB by default). Therefore we propose to store group metadata with two records types: the ConsumerGroupMetadata and the ConsumerGroupMemberMetadata.

A group with X members will be stored with X+2 records. One ConsumerGroupMemberMetadata per member, one ConsumerGroupPartitionMetadata, and one ConsumerGroupMetadata for the group at the end. Atomicity is not a concern here. All the records can be applied independently.

Moreover, the whole group does not necessarily have to be written for every epoch. Members who have not changed could be omitted as the compacted topic will retain their previous state anyway.

When a member is deleted, a tombstone for him is written to the partition.

...

of this partition." },
        // CommitTimestamp has been removed from v2 and later.
        { "name": "CommitTimestamp", "type": "int64", "versions": "1", "default": "-1",
          "about": "The timestamp of the commit." },
        { "name": "CommittedMetadata", "type": "string", "versions": "0+", "nullableVersions": "0+",
          "about": "Any associated metadata the client wants to keep." }
      ]}
    ]}
  ]
}

Required ACL

  • Read Group

Request Validation

INVALID_REQUEST is returned should the request not obey to the following invariants:

  • A topic has both a name and an ID set.

Request Handling

The MemberId and the GenerationIdOrMemberEpoch are verified. FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID is returned accordingly.

Response Schema

Code Block
languagejs
linenumberstrue
{
  "apiKey": 8,
  "type": "response",
  "name": "OffsetCommitResponse",
  // Versions 1 and 2 are the same as version 0.
  //
  // Version 3 adds the throttle time to the response.
  //
  // Starting in version 4, on quota violation, brokers send out responses before throttling.
  //
  // Versions 5 and 6 are the same as version 4.
  //
  // Version 7 offsetCommitRequest supports a new field called groupInstanceId to indicate member identity across restarts.
  //
  // Version 8 is the first flexible version.
  //
  // Version 9 adds TopicId field and can return FENCED_MEMBER_EPOCH and UNKNOWN_MEMBER_ID errors (KIP-848).
  "validVersions": "0-9",
  "flexibleVersions": "8+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]OffsetCommitResponseTopic", "versions": "0+",
      "about": "The responses for each topic.", "fields": [
      // Updated field.
      { "name": "Name", "type": "string", "versions": "0+", "nullableVersions": "9+", "default": "null", "entityType": "topicName",
        "about": "The topic name."},
      // New field.
      { "name": "TopicId", "type": "uuid", "versions": "9+", "about": "The unique topic ID" },
      { "name": "Partitions", "type": "[]OffsetCommitResponsePartition", "versions": "0+",
        "about": "The responses for each partition in the topic.",  "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." }
      ]}
    ]}
  ]
}

OffsetFetch API

The version of the API is bumped to 9 to add support for topic ids. The request can either use topic ids or topic names. The consumer will only use topic ids when they are available whereas the admin client will continue to use topic names as per its API.

Request Schema

Code Block
languagejs
linenumberstrue
{
  "apiKey": 9,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "OffsetFetchRequest",
  // In version 0, the request read offsets from ZK.
  //
  // Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
  //
  // Starting in version 2, the request can contain a null topics array to indicate that offsets
  // for all topics should be fetched. It also returns a top level error code
  // for group or coordinator level errors.
  //
  // Version 3, 4, and 5 are the same as version 2.
  //
  // Version 6 is the first flexible version.
  //
  // Version 7 is adding the require stable flag.
  //
  // Version 8 is adding support for fetching offsets for multiple groups at a time
  //
  // Version 9 adds GenerationIdOrMemberEpoch, MemberId and TopicId fields (KIP-848).
  "validVersions": "0-9",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId",
      "about": "The group to fetch offsets for." },
    // New fields.
    { "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true,
      "about": "The generation of the group." },
    { "name": "MemberId", "type": "string", "versions": "9+", "ignorable": true,
      "about": "The member ID assigned by the group coordinator." },
    // End of new fields.
    { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0-7", "nullableVersions": "2-7",
      "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName",
        "about": "The topic name."},
      { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7",
        "about": "The partition indexes we would like to fetch offsets for." }
    ]},
    { "name": "Groups", "type": "[]OffsetFetchRequestGroup", "versions": "8+",
      "about": "Each group we would like to fetch offsets for", "fields": [
      { "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
        "about": "The group ID."},
      { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+",
        "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
        // Updated field.
        { "name": "Name", "type": "string", "versions": "8+", "nullableVersions": "9+", "default": "null", "entityType": "topicName",
          "about": "The topic name."},
        // New field.
        { "name": "TopicId", "type": "uuid", "versions": "9+", "about": "The unique topic ID" },
        { "name": "PartitionIndexes", "type": "[]int32", "versions": "8+",
          "about": "The partition indexes we would like to fetch offsets for." }
      ]}
    ]},
    { "name": "RequireStable", "type": "bool", "versions": "7+", "default": "false",
      "about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions."}
  ]
}

Required ACL

  • Describe Group

Request Validation

INVALID_REQUEST is returned should the request not obey to the following invariants:

  • A topic has both a name and an ID set.

Request Handling

The MemberId and the GenerationIdOrMemberEpoch are verified. FENCED_MEMBER_EPOCH, UNKNOWN_MEMBER_ID or ILLEGAL_GENERATION is returned accordingly.

Response Schema

Code Block
languagejs
linenumberstrue
{
  "apiKey": 9,
  "type": "response",
  "name": "OffsetFetchResponse",
  // Version 1 is the same as version 0.
  //
  // Version 2 adds a top-level error code.
  //
  // Version 3 adds the throttle time.
  //
  // Starting in version 4, on quota violation, brokers send out responses before throttling.
  //
  // Version 5 adds the leader epoch to the committed offset.
  //
  // Version 6 is the first flexible version.
  //
  // Version 7 adds pending offset commit as new error response on partition level.
  //
  // Version 8 is adding support for fetching offsets for multiple groups
  //
  // Version 9 adds TopicId field and can return FENCED_MEMBER_EPOCH, UNKNOWN_MEMBER_ID,
  // and ILLEGAL_GENERATION errors.
  "validVersions": "0-8",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0-7",
      "about": "The responses per topic.", "fields": [
      { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0-7",
        "about": "The responses per partition", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0-7",
          "about": "The partition index." },
        { "name": "CommittedOffset", "type": "int64", "versions": "0-7",
          "about": "The committed message offset." },
        { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5-7", "default": "-1",
          "ignorable": true, "about": "The leader epoch." },
        { "name": "Metadata", "type": "string", "versions": "0-7", "nullableVersions": "0-7",
          "about": "The partition metadata." },
        { "name": "ErrorCode", "type": "int16", "versions": "0-7",
          "about": "The error code, or 0 if there was no error." }
      ]}
    ]},
    { "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", "ignorable": true,
      "about": "The top-level error code, or 0 if there was no error." },
    { "name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
      "about": "The responses per group id.", "fields": [
      { "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
        "about": "The group ID." },
      { "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": "8+",
        "about": "The responses per topic.", "fields": [
        { "name": "Name", "type": "string", "versions": "8+", "nullableVersions": "9+", "default": "null", "entityType": "topicName",
          "about": "The topic name."},
        { "name": "TopicId", "type": "uuid", "versions": "9+", "about": "The unique topic ID" },
        { "name": "Partitions", "type": "[]OffsetFetchResponsePartitions", "versions": "8+",
          "about": "The responses per partition", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "8+",
            "about": "The partition index." },
          { "name": "CommittedOffset", "type": "int64", "versions": "8+",
            "about": "The committed message offset." },
          { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "8+", "default": "-1",
            "ignorable": true, "about": "The leader epoch." },
          { "name": "Metadata", "type": "string", "versions": "8+", "nullableVersions": "8+",
            "about": "The partition metadata." },
          { "name": "ErrorCode", "type": "int16", "versions": "8+",
            "about": "The partition-level error code, or 0 if there was no error." }
        ]}
      ]},
      { "name": "ErrorCode", "type": "int16", "versions": "8+", "default": "0",
        "about": "The group-level error code, or 0 if there was no error." }
    ]}
  ]
}

Response Handling

Upon receiving the FENCED_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.

DescribeConfigs API

Request Schema

The schema is the same but the ResourceType field can be set to GROUP (16).

Required ACL

  • Describe Config on the group.

Request Validation

No changes.

Request Handling

The new GROUP resource type is handled.

Response Schema

No changes.

AlterIncrementalConfigs API

The API is the same but supports a new resource type: GROUP (16). When GROUP is used, the resource name corresponds to the group id.

Request Schema

The schema is the same but the ResourceType field can be set to GROUP (16).

Required ACL

  • Alter Config on the group.

Request Validation

No changes.

Request Handling

The new GROUP resource type is handled.

Response Schema

No changes.

Records

This section describes the new record types required for the new protocol. The storage layout is based on the data model described earlier in this document.

As explained earlier, they will be persisted in the __consumer_offsets compacted topic. The compacted topic based storage requires a dedicated key type per record type in order for the compaction to work. The current protocol already uses versions from 0 to 2 (included) for the keys.

Group Metadata

Groups can be rather large so we propose to use several records to store a group in order to not be limited by the maximum batch size (1MB by default). Therefore we propose to store group metadata with two records types: the ConsumerGroupMetadata and the ConsumerGroupMemberMetadata.

A group with X members will be stored with X+2 records. One ConsumerGroupMemberMetadata per member, one ConsumerGroupPartitionMetadata, and one ConsumerGroupMetadata for the group at the end. Atomicity is not a concern here. All the records can be applied independently.

Moreover, the whole group does not necessarily have to be written for every epoch. Members who have not changed could be omitted as the compacted topic will retain their previous state anyway.

When a member is deleted, a tombstone for him is written to the partition.

ConsumerGroupMetadataKey

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMetadataKey",
    "validVersions": "3",
    "flexibleVersions": "none",
    "fields": [
      	{ "name": "GroupId", "type": "string", "versions": "3" }
    ]
}

ConsumerGroupMetadataValue

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" }
    ], 
}

ConsumerGroupPartitionMetadataKey

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMetadataKeyConsumerGroupPartitionMetadataKey",
    "validVersions": "34",
    "flexibleVersions": "none",
    "fields": [
      	{ "name": "GroupId", "type": "string", "versions": "34" }
    ]
}

...

ConsumerGroupPartitionMetadataValue

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    ",
    "name": "ConsumerGroupPartitionMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" },
        { "name": "TopicPartitionMetadata", "versions": "0+",
          "type": "[]TopicPartition", "fields": [
            { "name": "ConsumerGroupMetadataValueTopicId",
     "validVersionsversions": "0+",
     "flexibleVersionstype": "0+uuid" },
    "fields": [
                    { "name": "EpochNumPartitions", "versions": "0+", "type": "int32" }
          ]}
    ], 
}

...

ConsumerGroupMemberMetadataKey

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupPartitionMetadataKeyConsumerGroupMemberMetadataKey",
    "validVersions": "45",
    "flexibleVersions": "none",
    "fields": [
      	        { "name": "GroupId", "type": "string", "versions": "5" },
        { "name": "MemberId", "type": "string", "versions": "45" }
    ]
}

...

ConsumerGroupMemberMetadataValue

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupPartitionMetadataValueConsumerGroupMemberMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "EpochGroupEpoch", "versions": "0+", "type": "int32" },
        { "name": "TopicPartitionMetadataInstanceId", "versions": "0+",
           "type": "[]TopicPartitionstring", "fields": [
             },
        { "name": "TopicIdClientId", "versions": "0+", "type": "uuidstring" },
                    { "name": "NumPartitionsClientHost", "versions": "0+", "type": "int32" }
          ]}
    ], 
}

ConsumerGroupMemberMetadataKey

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMemberMetadataKey",
    "validVersions": "5",
    "flexibleVersions": "none",
    "fields": ["string" },
        { "name": "GroupIdSubscribedTopicNames", "typeversions": "string0+", "versionstype": "5[]string" },
        { "name": "MemberIdSubscribedTopicRegex", "typeversions": "string0+", "versionstype": "5string" }
    ]
}

ConsumerGroupMemberMetadataValue

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    ,
        { "name": "ConsumerGroupMemberMetadataValueAssignors",
     "validVersionsversions": "0+",
              "flexibleVersionstype": "0+[]Assignor",
     "fields": [
                    { "name": "GroupEpochName", "versions": "0+", "type": "int32string" },
                    { "name": "InstanceIdMinimumVersion", "versions": "0+", "type": "stringint16" },
                    { "name": "ClientIdMaximumVersion", "versions": "0+", "type": "stringint16" },
                    { "name": "ClientHostReason", "versions": "0+", "type": "stringint8" },
        			{ "name": "SubscribedTopicNamesVersion", "versions": "0+", "type": "[]stringint16" },
                    { "name": "SubscribedTopicRegexMetadata", "versions": "0+", "type": "stringbytes" }
          ]}
    ],
        { " 
}

Target Assignment

The target assignment is stored in a single record.

ConsumerGroupTargetAssignmentKey

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "AssignorsConsumerGroupTargetAssignmentKey", 
    "versionsvalidVersions": "0+6",
              "typeflexibleVersions": "[]Assignornone", 
    "fields": [
                  	{ "name": "NameGroupId", "versionstype": "0+string", "typeversions": "string5" }
    ]
}

ConsumerGroupTargetAssignmentValue

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    ,
            { "name": "MinimumVersionConsumerGroupTargetAssignmentValue", 
    "versionsvalidVersions": "0+", 
    "typeflexibleVersions": "int16" },
            "0+",
    "fields": [
        { "name": "MaximumVersionAssignmentEpoch", "versions": "0+", "type": "int16int32" },
                    { "name": "ReasonMembers", "versions": "0+", "type": "[]Member", "int8fields": },[
		        	{ "name": "VersionMemberId", "versions": "0+", "type": "int16string" },
            { "name": "MetadataError", "versions": "0+", "type": "bytesint8" }
          ]}
    ], 
}

Target Assignment

The target assignment is stored in a single record.

ConsumerGroupTargetAssignmentKey

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    ",
            { "name": "ConsumerGroupTargetAssignmentKeyTopicPartitions",
     "validVersionsversions": "60+",
    "flexibleVersions          	  "type": "none[]TopicPartition",
     "fields": [
                  	{ "name": "GroupIdTopicId", "typeversions": "string0+", "versionstype": "5uuid" }
    ]
}

ConsumerGroupTargetAssignmentValue

Code Block
languagejs
linenumberstrue
{
    "type,
            	{ "name": "Partitions", "versions": "data0+",
     "nametype": "ConsumerGroupTargetAssignmentValue",
    "validVersions[]int32" }
        	]},
        	{ "name": "0Version",
     "flexibleVersionsversions": "0+",
     "fieldstype": [ "int16" },
        	{ "name": "AssignmentEpochMetadata", "versions": "0+", "type": "int32bytes" },
        { ]
    ]
}

Current Member Assignment

The current member assignment represents, as the name suggests, the current assignment of a given member.

When a member is deleted from the group, a tombstone for him is written to the partition.

ConsumerGroupCurrentMemberAssignmentKey

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "MembersConsumerGroupCurrentMemberAssignmentKey", 
    "versionsvalidVersions": "0+7", 
    "typeflexibleVersions": "[]Membernone", 
    "fields": [
              	{ "name": "MemberIdGroupId", "versionstype": "0+string", "typeversions": "string7" },
                  	{ "name": "ErrorMemberId", "versionstype": "0+string", "typeversions": "int87" },
    ]
}

ConsumerGroupCurrentMemberAssignmentValue

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    
            { "name": "TopicPartitionsConsumerGroupCurrentMemberAssignmentValue", 
    "versionsvalidVersions": "0+",
          	  "type    "flexibleVersions": "[]TopicPartition0+", 
    "fields": [
            	        { "name": "TopicIdMemberEpoch", "versions": "0+", "type": "uuidint32" },
            		{ "name": "PartitionsError", "versions": "0+", "type": "[]int32int8" }
        	]},
            	 { "name": "TopicPartitions", "versions": "0+",
          "type": "[]TopicPartition", "fields": [
            { "name": "VersionTopicId", "versions": "0+", "type": "int16uuid" },
        	            { "name": "MetadataPartitions", "versions": "0+", "type": "bytes[]int32" }
        ]},
        ]
    ]
}

Current Member Assignment

The current member assignment represents, as the name suggests, the current assignment of a given member.

When a member is deleted from the group, a tombstone for him is written to the partition.

ConsumerGroupCurrentMemberAssignmentKey

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupCurrentMemberAssignmentKeyVersion",
    "validVersions": "7",
    "flexibleVersionsversions": "none0+",
    "fields": [
      	{ "name": "GroupId", "type": "string", "versions": "7int16" },
      	        { "name": "MemberIdMetadata", "typeversions": "string0+", "versionstype": "7bytes" },
    ]
}

...

OffsetCommitValue

Code Block
languagejs
linenumberstrue
{
      "type": "data",
      "name": "ConsumerGroupCurrentMemberAssignmentValueOffsetCommitValue",
      "validVersions": "0-4",
      "flexibleVersions": "04+",
      "fields": [
            { "name": "MemberEpochoffset", "versionstype": "0+int64", "typeversions": "int320+" },
		    { "name": "leaderEpoch", "type": "Errorint32", "versions": "03+", "typedefault": -1, "int8ignorable": true },
        { "name": "TopicPartitionsmetadata", "versionstype": "0+string",
           "typeversions": "[]TopicPartition", "fields": [
            0+" },
    { "name": "TopicIdcommitTimestamp", "versionstype": "0+int64", "typeversions": "uuid0+" },
                { "name": "expireTimestamp", "type": "Partitionsint64", "versions": "0+1", "typedefault": -1, "[]int32" }
        ]ignorable": true },
    // Adds TopicId field.
    { "name": "Version", "versions": "0+topicId", "type": "int16uuid" },
        { "name": "Metadata", "versions": "0+4", "typeignorable": "bytes"true }
      ]
}

Broker API

The new PartitionAssignor interface will be introduced on the server side. Two implementations will be provided out of the box: RangeAssignor (range) and UniformAssignor (uniform).

...