Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added information about share group protocol

...

Share group membership is controlled by the group coordinator. Consumers in a share group use the heartbeat mechanism to join, leave and confirm continued membership of the share group, using the new ShareGroupHeartbeat RPC RPC. Share-partition assignment is also piggybacked on the heartbeat mechanism. Share groups only support server-side assignors. This KIP introduces just one assignor, org.apache.kafka.server.group.share.SimpleAssignor , which assigns all partitions of all subscribed topics to all members.

...

The reconciliation process for a share group is very simple because there is no fencing - the group coordinator revokes the partitions which are no longer in the target assignment of the member and assigns the new partitions to the member at the same time. There’s no need for the revocations to be acknowledged before new partitions are assigned. The member acknowledges changes to its assignment, but the group coordinator does not depend upon receiving the acknowledgement to proceed.

Heartbeat and session

Data model

This is the data model maintained by The member uses the new ShareGroupHeartbeat API to establish a session with the group coordinator for share groups.

Share Group and Member

The group and members represent the current state of a share group. This is reminiscent of a simplified consumer group.

Share Group
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Group Epochint32The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group.
Members

[]Member

The set of members in the group.
Partitions Metadata

[]PartitionMetadata

The metadata of the partitions that the group is subscribed to. This is used to detect partition metadata changes.
Member
Name

Type

Description
Member ID

string

The unique identifier of the member. It is generated by the coordinator upon the first heartbeat request and must be used throughout the lifetime of the member.
Rack ID

string

The rack ID configured by the consumer.
Client ID

string

The client ID configured by the consumer.
Client Host

string

The client host of the consumer.
Subscribed Topic Names

[]string

The current set of subscribed topic names configured by the consumer.
Server Assignor

string

The server-side assignor used by the group.

Target Assignment

The target assignment of the group. This represents the assignment that all the members of the group will eventually converge to. It is a declarative assignment which is generated by the assignor based on the group state.

Target Assignment
NameTypeDescription
Group ID stringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Assignment Epochint32The epoch of the assignment. It represents the epoch of the group used to generate the assignment. It will eventually match the group epoch.
Assignment Errorint8The error reported by the assignor.
Members[]MemberThe assignment for each member.
Member
NameTypeDescription
Member IDstringThe unique identifier of the member.
Partitions[]TopicIdPartitionThe set of partitions assigned to this member.
MetadatabytesThe metadata assigned to this member.

Current Assignment

The current assignment of a member.

Current Assignment
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Member IDstringThe member ID of this member.
Member Epochint32The current epoch of this member. The epoch is the assignment epoch of the assignment currently used by this member.
Errorint8The error reported by the assignor.
Partitions[]TopicIdPartitionThe current partitions used by the member.
Versionint16The version used to encode the metadata.
MetadatabytesThe current metadata used by the member.

Rebalance process

The rebalance process is driven by the group coordinator and revolves around three kinds of epochs: the group epoch, the assignment epoch and the member epoch. This is intentionally very similar to how the process works for consumer groups in KIP-848.

Group epoch - Trigger a rebalance

The group coordinator is responsible for triggering a rebalance of the group when the metadata of the group changes. The group epoch represents the generation of the group metadata. It is incremented whenever the group metadata is updated. This happens in the following cases:

  • A member joins or leaves the group.
  • A member updates its subscriptions.
  • A member is removed from the group by the group coordinator.
  • The partition metadata is updated, such as when a new partition is added or a topic is created or deleted.

In all these cases, a new version of the group metadata is calculated by the group coordinator with an incremented group epoch. For a share group, the group coordinator does not persist the group metadata. The new version of the group metadata signals that a new assignment is required for the group.

Assignment epoch - Compute the group assignment

Whenever the group epoch is larger that the target assignment epoch, the group coordinator triggers the computation of a new target assignment based on the latest group metadata using a server-side assignor. For a share group, the group coordinator does not persist the assignment. The assignment epoch becomes the group epoch of the group metadata used to compute the assignment.

Member epoch - Reconciliation of the group

Each member independently reconciles its current assignment with its new target assignment, converging with the target epoch and assignment.

The group coordinator revokes the partitions which are no longer in the target assignment of the member, and assigns the partitions which have been added to the target assignment of the member. It provides the new assignment to the member in its heartbeat response until the member acknowledges the assignment change in a heartbeat request.

By assigning and revoking partitions for the members of the group, the group coordinator can balance the partitions across the members of the group.

The member provides the rebalance timeout to the group coordinator when it joins the group. This is the timeout for the group coordinator waiting for the member to acknowledge that it has adopted the target assignment. If the member does not confirm the target assignment within the rebalance timeout, the group coordinator removes the member from the group.

Member ID

Every member is uniquely identified by the UUID called the member ID. This UUID is generated by the group coordinator and given to the member when it joins the group. It is used in all communication with the group coordinator and must be kept during the entire lifespan of the member.

Heartbeat and session

The member uses the new ShareGroupHeartbeat API to establish a session with the group coordinator. The member is expected to heartbeat every group.share.heartbeat.interval.ms in order to keep its session opened. If it does not heartbeat at least once within the group.share.session.timeout.ms, the group coordinator will remove the member from the group. The member The member is expected to heartbeat every group.share.heartbeat.interval.ms in order to keep its session opened. If it does not heartbeat at least once within the group.share.session.timeout.ms, the group coordinator will kick the member out from the group. The member is told the heartbeat interval in the response to the ShareGroupHeartbeat API.

If a member is kicked out of removed from the group because it fails to heartbeat, because there’s intentionally no fencing, at the protocol level, the consumer does not lose the ability to fetch and acknowledge records. A failure to heartbeat is most likely because the consumer has died. If the consumer just failed to heartbeat due to a temporary pause, it could in theory continue to fetch and acknowledge records. When it finally sends a heartbeat and realises it’s been kicked out of the group, it should stop fetching records because its assignment has been revoked, and rejoin the group.

...

  • EMPTY - When a share group is created or the last member leaves the group, the share group is EMPTY.
  • STABLE - When a share group has active members, the share group is STABLE.DEAD - When the share group remains EMPTY for a configured period, the group coordinator transitions it to DEAD to delete it
  • DEAD - When the share group remains EMPTY for a configured period, the group coordinator transitions it to DEAD to delete it.

Persistence and Fail-Over

For a share group, the group coordinator only persists a single record which essentially reserves the group's ID as a share group in the namespace of groups.

When the group coordinator fails over, the newly elected coordinator loads the state from the __consumer_offsets  partition. This means a share group will remain in existence across the fail-over. However, the members of the groups and their assignments are not persisted. This means that existing members will have to rejoin the share group following a coordinator failover.

In-flight records

For each share-partition, the share group adds some state management for the records being consumed. The starting offset of records which are eligible for consumption is known as the share-partition start offset (SPSO), and the last offset of records which are eligible for consumption is known as the share-partition end offset (SPEO). The records between starting at the SPSO and up to the SPEO are known as the in-flight records. So, a share-partition is essentially managing the consumption of the in-flight records.

...

  • Altering the offsets for a share group resets the Share-Partition Start Offset for topic-partitions in the share group (share-partitions)
  • The members of a share group are not assigned partitions
  • A share group has only three states - EMPTYSTABLE and DEAD 

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+",
  "fields": [", "default": "null", "entityType": "groupId",
      "about": "null if not provided or if it didn't change since the last fetch; the group identifier otherwise." },
    { "name": "GroupIdMemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "null if not provided or if it didn't change since the last fetch; the group identifier member id generated by the coordinator otherwise." },
    { "name": "AcquisitionTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
      "about": "MemberId", "type-1 if it didn't chance since the last fetch; the maximum time in milliseconds that the fetched records are acquired for the consumer." },
    { "name": "stringMaxWaitMs", "versionstype": "0+int32", "nullableVersionsversions": "0+", "default": "null",
      "about": "nullThe ifmaximum nottime providedin ormilliseconds ifto itwait didn't change since the last fetch; the member id generated by the coordinator otherwisefor the response." },
    { "name": "AcquisitionTimeoutMsMinBytes", "type": "int32", "versions": "0+", "default": -1,
      "about": "-1The ifminimum itbytes didn't chance since the last fetch; the maximum time in milliseconds that the fetched records are acquired for the consumerto accumulate in the response." },
    { "name": "MaxWaitMsMaxBytes", "type": "int32", "versions": "0+", "default": "int320x7fffffff", "versionsignorable": "0+"true,
      "about": "The maximum bytes timeto infetch. milliseconds toSee waitKIP-74 for the response cases where this limit may not be honored." },
    { "name": "MinBytesSessionId", "type": "int32", "versions": "0+", "default": "0", "ignorable": true,
      "about": "The minimumshare bytes to accumulate in the responsesession ID." },
    { "name": "MaxBytesSessionEpoch", "type": "int32", "versions": "0+", "default": "0x7fffffff-1", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
 share session epoch, which is used for ordering requests in a session." },
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "SessionIdTopicId", "type": "int32uuid", "versions": "0+", "default": "0", "ignorable": true,
      "about": "The shareunique sessiontopic ID." },
      { "name": "SessionEpochPartitions", "type": "int32[]FetchPartition", "versions": "0+", "default": "-1", "ignorable": true,

        "about": "The sharepartitions sessionto epochfetch.", which is used for ordering requests in a session." },
"fields": [
        { "name": "TopicsPartitionIndex", "type": "[]FetchTopicint32", "versions": "0+",
          "about": "The topicspartition to fetchindex." },
 "fields": [
      { "name": "TopicIdCurrentLeaderEpoch", "type": "uuidint32", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The unique topic ID" current leader epoch of the partition." },
        { "name": "PartitionsPartitionMaxBytes", "type": "[]FetchPartitionint32", "versions": "0+",
          "about": "The maximum partitionsbytes to fetch.", "fields": [ from this partition.  See KIP-74 for cases where this limit may not be honored." }
      ]}
    ]},
    { "name": "PartitionIndexForgottenTopicsData", "type": "int32[]ForgottenTopic", "versions": "0+",
 "ignorable": false,
        "about": "The partition index." },
  In an incremental fetch request, the partitions to remove.", "fields": [
      { "name": "CurrentLeaderEpochTopicId", "type": "int32uuid", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The currentunique leader epoch of the partition." topic ID"},
        { "name": "PartitionMaxBytesPartitions", "type": "[]int32", "versions": "0+",
          "about": "The maximumpartitions bytesindexes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." }
      ]}
    ]},forget." }
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareFetchResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ForgottenTopicsDataThrottleTimeMs", "type": "[]ForgottenTopicint32", "versions": "0+", "ignorable": falsetrue,
      "about": "In an incremental fetch request, the partitions to remove.", "fields": [
  The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "TopicIdErrorCode", "type": "uuidint16", "versions": "0+", "ignorable": true,
      "about": "The top uniquelevel response topicerror IDcode." },
      { "name": "PartitionsSessionId", "type": "[]int32", "versions": "0+",
        "about"default": "The partitions indexes to forget." }0", "ignorable": false,
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type  "about": "The share session ID." },
    { "name": "responseResponses",
  "nametype": "ShareFetchResponse[]ShareFetchableTopicResponse",
  "validVersionsversions": "0+",
      "flexibleVersionsabout": "0+The response topics.",
  "fields": [
      { "name": "ThrottleTimeMsTopicId", "type": "int32uuid", "versions": "0+", "ignorable": true,
      "about": "The durationunique in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." topic ID"},
      { "name": "ErrorCodePartitions", "type": "int16[]PartitionData", "versions": "0+",
 "ignorable": true,
      "about": "The top level response error code." },
 topic partitions.", "fields": [
        { "name": "SessionIdPartitionIndex", "type": "int32", "versions": "0+",
 "default": "0", "ignorable": false,
      "about": "The sharepartition session IDindex." },
        { "name": "ResponsesErrorCode", "type": "[]ShareFetchableTopicResponseint16", "versions": "0+",
          "about": "The response topics.", "fields": [
 error code, or 0 if there was no fetch error." },
        { "name": "TopicIdLastStableOffset", "type": "uuidint64", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [ last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
        { "name": "PartitionIndexCurrentLeader", "type": "int32LeaderIdAndEpoch", "versions": "0+",
 "taggedVersions": "0+",        "about"tag": 0, "fields": "The[
 partition index." },
        { "name": "ErrorCodeLeaderId", "type": "int16int32", "versions": "0+",
            "about": "The error code, ID of the current leader or 0-1 if therethe wasleader nois fetch errorunknown." },
          { "name": "LastStableOffsetLeaderEpoch", "type": "int64", "versions": "0+", "default": "-1int32", "ignorableversions": true"0+",
            "about": "The lastlatest stableknown offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" leader epoch." }
        ]},
        { "name": "CurrentLeaderAbortedTransactions", "type": "LeaderIdAndEpoch[]AbortedTransaction", "versions": "0+", "taggedVersionsnullableVersions": "0+", "ignorable": true,
          "tagabout": 0 "The aborted transactions.",  "fields": [
          { "name": "LeaderIdProducerId", "type": "int32int64", "versions": "0+", "entityType": "producerId",
            "about": "The IDproducer ofid the current leader or -1 ifassociated with the leaderaborted is unknowntransaction." },
          { "name": "LeaderEpochFirstOffset", "type": "int32int64", "versions": "0+",
            "about": "The latest known leader epochfirst offset in the aborted transaction." }
        ]},
        { "name": "AbortedTransactionsRecords", "type": "[]AbortedTransactionrecords", "versions": "0+", "nullableVersions": "0+", "ignorableabout": true "The record data."},
        { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The abortedacquired transactionsrecords.",  "fields":  [
          { "name": "ProducerIdBaseOffset", "type":  "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."},
          {"name": "LastOffset", "type": "0+int64", "entityTypeversions": "producerId0+",
            "about": "The producerlast offset idof associatedthis withbatch theof abortedacquired transactionrecords." },
          { "name": "FirstOffsetDeliveryCount", "type": "int64int16", "versions": "0+",
 "about": "The delivery count of this batch of acquired   "about": "The first offset in the aborted transaction." records."}
        ]}
        ]},
    ]},
    { "name": "RecordsNodeEndpoints", "type": "records[]NodeEndpoint", "versions": "016+", "nullableVersionstaggedVersions": "0+", "abouttag": "The record data."}0,
        { "nameabout": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired recordsEndpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields":  [
       {   {"name": "BaseOffsetNodeId", "type":  "int64int32", "versions": "0+",
 "about       "mapKey": "The earliest offset in this batch of acquired records."true, "entityType": "brokerId", "about": "The ID of the associated node." },
         { {"name": "LastOffsetHost", "type": "int64string", "versions": "0+",
        "about": "The last offset of this batch of acquired records."node's hostname." },
         { {"name": "DeliveryCountPort", "type": "int16", "versions": "0+", "about": "The delivery count of this batch of acquired records."}int32", "versions": "0+",
        ]}
      ]}
    ]},
"about": "The node's port." },
      { "name": "NodeEndpointsRack", "type": "[]NodeEndpointstring", "versions": "160+", "taggedVersionsnullableVersions": "0+", "tagdefault": 0"null",
        "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

ShareAcknowledge API

The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareAcknowledgeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "HostSessionId", "type": "stringint32", "versions": "0+",
        "about": "The node's hostnameshare session ID." },
      { "name": "PortSessionEpoch", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "nullshare session epoch, which is used for ordering requests in a session." },
    { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
        "about": "The racktopics of the node, or null if it has not been assigned to a rack." }containing records to acknowledge.", "fields": [
    ]}
  ]
}

ShareAcknowledge API

The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.

Request schema

Code Block
{
  "apiKeyname": NN"TopicId",
  "type": "requestuuid",
  "versions": "0+", "listenersabout": ["broker"],
  "name": "ShareAcknowledgeRequest",
  "validVersions"The unique topic ID"},
      { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
        "flexibleVersionsabout": "0+The partitions containing records to acknowledge.",
  "fields": [
        { "name": "SessionIdPartitionIndex", "type": "int32", "versions": "0+",
          "about": "The sharepartition session IDindex." },
        { "name": "SessionEpochAcknowledgementBatches", "type": "int32[]AcknowledgementBatch", "versions": "0+",
          "about": "TheRecord sharebatches sessionto epochacknowledge.", which is used for ordering requests in a session." },
"fields": [
          { "name": "TopicsStartOffset", "type": "[]AcknowledgeTopicint64", "versions": "0+",
            "about": "The topics containingStart offset of batch of records to acknowledge."}, "fields": [

          { "name": "TopicIdLastOffset", "type": "uuidint64", "versions": "0+",: "0+",
            "about": "The unique topic IDLast offset (inclusive) of batch of records to acknowledge."},
          { "name": "PartitionsGapOffsets", "type": "[]AcknowledgePartitionint64", "versions": "0+",
            "about": "The partitions containing recordsArray of offsets in this range which do not correspond to acknowledgerecords."},
 "fields": [
        { "name": "PartitionIndexAcknowledgeType", "type": "int32string", "versions": "0+", "default": "accept",
            "about": "The partition index." },
   type of acknowledgement, such as accept or release."}
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "nametype": "AcknowledgementBatchesresponse",
  "typename": "[]AcknowledgementBatchShareAcknowledgeResponse",
  "versionsvalidVersions": "0+",
          "about"flexibleVersions": "Record batches to acknowledge.0+",
  "fields": [
          { "name": "StartOffsetThrottleTimeMs", "type": "int64int32", "versions": "0+",
            "about"ignorable": "Start offset of batch of records to acknowledge."}true,
          { "name"about": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
      The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "GapOffsetsErrorCode", "type": "[]int64int16", "versions": "0+",
      "ignorable": true,
      "about": "ArrayThe oftop offsetslevel inresponse this range which do not correspond to recordserror code." },
          { "name": "AcknowledgeTypeSessionId", "type": "stringint32", "versions": "0+", "default": "accept0",
            "aboutignorable": "The type of acknowledgement, such as accept or release."}
        ]}
      ]}false,
    ]}
  ]
}

Response schema

Code Block
{
  "apiKeyabout": NN "The share session ID." },
    { "typename": "responseResponses",
  "nametype": "ShareAcknowledgeResponse[]ShareAcknowledgeTopicResponse",
  "validVersionsversions": "0+",
      "flexibleVersionsabout": "0+The response topics.",
  "fields": [
      { "name": "ThrottleTimeMsTopicId", "type": "int32uuid", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
unique topic ID"},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "ErrorCodePartitionIndex", "type": "int16int32", "versions": "0+", "ignorable": true,

          "about": "The top level response error codepartition index." },
        { "name": "SessionIdErrorCode", "type": "int32int16", "versions": "0+",
 "default": "0", "ignorable": false,
      "about": "The share session ID error code, or 0 if there was no error." },
        { "name": "ResponsesCurrentLeader", "type": "[]ShareAcknowledgeTopicResponseLeaderIdAndEpoch", "versions": "0+",
      "abouttaggedVersions": "The response topics."0+", "tag": 0, "fields": [
          { "name": "TopicIdLeaderId", "type": "uuidint32", "versions": "0+",
      "ignorable": true, "about": "The unique topic ID"},
      "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "PartitionsLeaderEpoch", "type": "[]PartitionDataint32", "versions": "0+",
            "about": "The topic partitions.", "fields": [latest known leader epoch." }
        ]}
      ]}
    ]},
    { "name": "PartitionIndexNodeEndpoints", "type": "int32[]NodeEndpoint", "versions": "016+",
 "taggedVersions": "0+", "tag": 0,
      "about": "The partition index." },
  Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "ErrorCodeNodeId", "type": "int16int32", "versions": "0+",
        "mapKey": true,   "entityType": "brokerId", "about": "The errorID code,of orthe 0 if there was no errorassociated node." },
        { "name": "CurrentLeaderHost", "type": "LeaderIdAndEpochstring", "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
   
        "about": "The node's hostname." },
       { "name": "LeaderIdPort", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknownnode's port." },
          { "name": "LeaderEpochRack", "type": "int32string", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
        "about": "The latest known leader epoch." }
        ] rack of the node, or null if it has not been assigned to a rack." }
      ]}
     ]
}

Records

This section describes the new record types.

Group Metadata

In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

For each share group, a single ConsumerGroupMetadata  record is written. When the group is deleted, a tombstone record is written.

ConsumerGroupMetadataKey

This is included for completeness. There is no change to this record.

Code Block
{
 
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+data",
  "taggedVersionsname": "0+ConsumerGroupMetadataKey",
  "tagvalidVersions": 0"3",
      "aboutflexibleVersions": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.",none",
  "fields": [
      { "name": "NodeIdGroupId", "type": "int32string", "versions": "0+3",
        "mapKeyabout": "The group id." }
  ]
}

ConsumerGroupMetadataValue

A new version of the record value is introduced contains the Type  field. For a share group, the type will be "share" . For a consumer group, the type can be omitted (null) or "consumer" .

Code Block
{
  "type": "data",
  true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "HostConsumerGroupMetadataValue",
  "typevalidVersions": "string0-1",
  "versionsflexibleVersions": "0+",
        "aboutfields": "The node's hostname." },
  [
    { "name": "PortEpoch", "type": "int32", "versions": "0+",
        "about": "The node'sgroup portepoch." },
    // Version 1 adds Type field
    { "name": "RackType", "type": "string", "versions": "01+", "nullableVersions": "01+", "default": "null",
        "about": "The rackgroup of the node, ortype - null ifindicates it has not been assigned to a rackconsumer group." }
    ]}
  ]
}}

Share-partition State

Information to be added.

Metrics

Further details to follow as the design progresses.

...