Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Recovery and log cleaning


A share-partition is a topic-partition with a subscription in a share - group. For a topic-partition subscribed in more than one share group, each share group has its own share-partition.


Two new control record types are introduced: SHARE_CHECKPOINT (5) and SHARE_DELTA (6). They are written into separate message sets with the Control flag set. This flag indicates that the records are not intended for application consumption. Indeed, these message sets are not returned to consumer at all since they are just intended for the share-partition leader.In order to recover the share-partition state, the share-partition leader has to read a SHARE_CHECKPOINT and zero or more SHARE_DELTA records which chain backwards to the SHARE_CHECKPOINT with the same checkpoint epoch. By applying the records in order, from earliest to latest, the state can be rebuilt.To avoid having to scan the topic in order to find these records, the share-partition leader keeps a share snapshot file which lets it locate the control records more efficiently.


A SHARE_CHECKPOINT record contains a complete checkpoint of the share-partition state. It contains:


Note that the Acquired state is not recorded because it’s transient. As a result, an Acquired record with a delivery count of 1 is recorded as Available with a delivery count of 0. In the unlikely event of a share-partition leader crash, memory of the in-flight delivery will be lost.


  • The group ID
  • The checkpoint epoch of the SHARE_CHECKPOINT it applies to
  • The offset of the preceding control record with the same checkpoint epoch
  • An array of [BaseOffset, LastOffset, State, DeliveryCount]  tuples



State changes

Cumulative state

Control records

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100

Code Block
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 110,
  "EndOffset": 110,
  "States": []

In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards

Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)

Acknowledge 100-109


SPSO=110, SPEO=110

Code Block
SHARE_DELTA offset 131:
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 130,
  "States": [
      "BaseOffset": 100,
      "LastOffset": 109,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1

With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records

Fetch records 110-119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)

Release 110

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)

Code Block
SHARE_DELTA offset 132:
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 131,
  "States": [
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available),
      "DeliveryCount": 1

Note that the SPEO in the control records is 111 at this point. All records after this are in their first delivery attempt so this is an acceptable situation.

Acknowledge 119

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged

Code Block
SHARE_DELTA offset 133:
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 132,
  "States": [
      "BaseOffset": 111,
      "LastOffset": 118,
      "State": 0 (Available),
      "DeliveryCount": 0
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1

Fetch records 110, 120

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

Lock timeout elapsed 111, 112

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

Code Block
SHARE_DELTA offset 134:
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 133,
  "States": [
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available),
      "DeliveryCount": 1

Acknowledge 113-118

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Code Block
SHARE_DELTA offset 135:
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 134,
  "States": [
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1

Fetch records 111,112

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Acknowledge 110


SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Code Block
SHARE_DELTA offset 136:
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 135,
  "States": [
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 2 (Acknowledged),
      "DeliveryCount": 2

Acknowledge 111,112


SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)

Code Block
SHARE_DELTA offset 137:
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 136,
  "States": [
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 2 (Acknowledged),
      "DeliveryCount": 2

or alternatively, taking a new checkpoint:

Code Block
  "GroupId": "G1",
  "CheckpointEpoch": 2,
  "StartOffset": 120,
  "EndOffset": 120,
  "States": []
Note that the delivery of 120 has not been recorded yet because it is the first delivery attempt and it is safe to recover the SPEO back to offset 120 and repeat the attempt


Note that the delivery of 120 has not been recorded yet because it is the first delivery attempt and it is safe to recover the SPEO back to offset 120 and repeat the attempt.

Recovering share-partition state and interactions with log cleaning

A share-partition is a topic-partition with a subscription in a share group. The share-partition is essentially a view of the topic-partition, managed by the share-partition leader, with durable state stored on the topic-partition in SHARE_CHECKPOINT and SHARE_DELTA control records.

In order to recreate the share-partition state when a broker becomes the leader of a share-partition, it must read the most recent SHARE_CHECKPOINT and any subsequent SHARE_DELTA control records, which will all have the same checkpoint epoch. In order to minimise the amount of log scanning required, it’s important to write SHARE_CHECKPOINT records frequently, and also to have an efficient way of finding the most recent SHARE_CHECKPOINT record.

For each share-partition, the offset of the most recent SHARE_CHECKPOINT record is called the Share Checkpoint Offset (SCO). The Earliest Share Offset (ESO) is the earliest of the share checkpoint offsets for all of the share groups with a subscription in a share group.

  • The log cleaner can clean all SHARE_CHECKPOINT and SHARE_DELTA records before the SCO.
  • The log cleaner must not clean SHARE_CHECKPOINT and SHARE_DELTA records after the SCO.

In practice, the ESO is used as the cut-off point for cleaning of these control records.

Public Interfaces

This KIP introduces extensive additions to the public interfaces.


The new org.apache.kafka.clients.consumer.AcknowledgeCommitCallback  can be implemented by the user to execute when acknowledgement completes. It is called on the application thread and is not permitted to called the methods of KafkaShareConsumer with the exception of KafkaShareConsumer.wakeup().

Method signatureDescription
void onCompletion(Map<TopicIdPartition, Optional<KafkaException>> results) 

A callback method the user can implement to provide asynchronous handling of request completion.


The new org.apache.kafka.clients.consumer.AcknowledgeType  enum distinguishes between the types of acknowledgement for a record consumer using a share group.


group.share.enableWhether to enable share groups on the broker.Default false  while the feature is being developed. Will become true  in a future release. maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.record.lock.duration.max.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 1000 (1 second), maximum 3600000 (1 hour)
group.share.record.lock.partition.limitShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds) 

The minimum session timeout.

Default 45000 (45 seconds) 

The maximum session timeout.

Default 60000 (60 seconds) 

The heartbeat interval given to the members.

Default 5000 (5 seconds) 

The minimum heartbeat interval.

Default 5000 (5 seconds) 

The maximum heartbeat interval.

Default 15000 (15 seconds)

The maximum number of share groups.

Default 10, minimum 1, maximum 100

The maximum number of consumers that a single share group can accommodate.

Default 200, minimum 10, maximum 1000

The server-side assignors as a list of full class names. In the initial delivery, only the first one in the list is used.

A list of class names. Default ""


Code Block
  "type": "data",
  "name": "ShareDeltaValue",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
   { "name": "GroupId", "type": "string", "versions": "0",
     "about": "The group identifier." },
   { "name": "CheckpointEpoch", "type": "uint16", "versions": "0",
     "about": "The checkpoint epoch, increments with each checkpoint." },
   { "name": "BackOffset", "type": "int64", "versions": "0",
     "about": "The offset of the previous ShareCheckpoint or ShareDelta." },
   { "name": "States", "type": "[]State", "versions": "0", "fields": [
      { "name": "BaseOffset", "type": "int64", "versions": "0",
        "about": "The base offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "State", "type": "int8", "versions": "0",
        "about": "The state - 0:Available,2:Acked,4:Archived." },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
