  • The consumers in a share group cooperatively consume records without partition assignmentwith partitions that may be assigned to multiple consumers

  • The number of consumers in a share group can exceed the number of partitions

  • Records are acknowledged on an individual basis, although the system is optimized to work in batches for improved efficiency

  • Delivery attempts to consumers in a share group are counted to enable automated handling of unprocessable records


When a consumer in a share-group fetches records, it receives available records from any of the topic-partitions that match its subscriptions. Records are acquired for delivery to this consumer with a time-limited acquisition lock. While a record is acquired, it is not available for another consumer. By default, the lock duration is 30s, but it can also be controlled using a consumer configurationthe group configuration parameter. The idea is that the lock is automatically released once the lock duration has elapsed, and then the record is available to be given to another consumer. The consumer which holds the lock can deal with it in the following ways:


There are some concepts being introduced to Kafka to support share groups.

A share-The group coordinator is the broker which is the group coordinator for a share groupnow responsible for coordination of share groups as well as consumer groups. The responsibility for being share-group coordinator for the cluster’s share groups is distributed among the brokers, exactly as for consumer groups. The For share -groups, the group coordinator has the following responsibilities:

  • It maintains the list of share-group members.

  • It manages the topic-partition assignments for the share-group members using a server-side partition assignor. An initial, trivial implementation would be to give each member the list of all topic-partitions which matches its subscriptions and then use the pull-based protocol to fetch records from all partitions. A more sophisticated implementation could use topic-partition load and lag metrics to distribute partitions among the consumers as a kind of autonomous, self-balancing partition assignment, steering more consumers to busier partitions, for example. Alternatively, a push-based fetching scheme could be used.


  • EMPTY - When a share group is created or the last member leaves the group, the share group is EMPTY.
  • STABLE - When a share group has active members, the share group is STABLE.
  • DEAD - When the share group remains EMPTY for a configured period, the group coordinator transitions it to DEAD to delete it.

For a share group, the group coordinator only persists a single record which essentially reserves the group's ID as a share group in the namespace of groups.


When the SPSO advances because of the LSO moving, the in-flight records past which the SPSO moves logically move into Archived state. The exception is that records which are already Acquired for delivery to consumers can be acknowledged successfullywith any AcknowledgeType, at which point they logically transition into Archived state too; there's no need to throw an exception for a consumer which has just processed a record which is about to become Archived.

Note that because the share groups are all consuming from the same log, the retention behavior for a topic applies to all of the share groups consuming from that topic.


Cooperative consumption is inherently record-based, but the expectation is that batching is used to maximise performance. For example, in the case where all records in a batch are processed successfully:

  • When a consumer fetches records, the share-partition leader prefers to return complete record batches.

  • In the usual and optimal case, all of the records in a batch will be in Available state and can all be moved to Acquired state with the same acquisition lock time-out.

  • When the consumer has processed the fetched records, it can acknowledge delivery of all of the records as a single batch, transitioning them all into Acknowledged state.


group.share.enableWhether to enable share groups on the broker.Default false  while the feature is being developed. Will become true  in a future release. maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.record.lock.duration.max.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 1000 (1 second), maximum 3600000 (1 hour)
group.share.record.lock.partition.limitShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds) 

The minimum session timeout.

Default 45000 (45 seconds) 

The maximum session timeout.

Default 60000 (60 seconds) 

The heartbeat interval given to the members.

Default 5000 (5 seconds) 

The minimum heartbeat interval.

Default 5000 (5 seconds) 

The maximum heartbeat interval.

Default 15000 (15 seconds)

The maximum number of consumers that a single share group can accommodate.

Default 200

The server-side assignors as a list of full class names. In the initial delivery, only the first one in the list is used.

A list of class names. Default ""

The maximum number of share sessions that the broker will maintain.

Default 1000

Group configuration

The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.


Controls how to read records written transactionally. If set to "read_committed", the share group will only deliver transactional records which have been committed. If set to "read_uncommitted", the share group will return all messages, even transactional messages which have been aborted. Non-transactional records will be returned unconditionally in either mode.

Valid values "read_committed"  and "read_uncommitted" (default) 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:

  • "earliest" : automatically reset the offset to the earliest offset

  • "latest" : automatically reset the offset to the latest offset

Valid values "latest"  (default) and "earliest" 

Consumer configuration

The following new configuration properties are added for the consumer.


Record acquisition lock duration in milliseconds.

null, which uses the cluster configuration, minimum 1000, maximum limited by the cluster configuration

Consumer configuration

The existing consumer configurations apply for share groups with the following exceptions:


  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareAcknowledgeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SessionId", "type": "int32", "versions": "0+",
      "about": "The share session ID." },
    { "name": "SessionEpoch", "type": "int32", "versions": "0+",
      "about": "The share session epoch, which is used for ordering requests in a session." },
    { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
      "about": "The topics containing records to acknowledge.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
        "about": "The partitions containing records to acknowledge.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "StartOffset", "type": "int64", "versions": "0+",
            "about": "Start offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "GapOffsets", "type": "[]int64", "versions": "0+",
            "about": "Array of offsets in this range which do not correspond to records."},
          { "name": "AcknowledgeType", "type": "stringint8", "versions": "0+", "default": "accept0",
            "about": "The type of acknowledgement, such as accept or release- 0:Accept,1:Release,2:Reject."}

Response schema


The following new broker metrics should be added:

Metric Name





JMX Bean




protocol: share

The total number of share groups managed by group coordinator.


rebalance (rebalance-rate and rebalance-count)



protocol: share

The total number of share group rebalances count and rate.






protocol: share

The number of share partitions managed by group coordinator. 



protocol: share

state: {empty|stable|dead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={empty|stable|dead} 

offset-commit (offset-commit-rate and offset-commit-count)



protocol: share

The total number of committed offsets for share groups.



record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)



protocol: share


The number of records acknowledged per acknowledgement type.



Future Work

There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.
