Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Transactions and metrics updates

...

Note that because the share groups are all consuming from the same log, the retention behavior for a topic applies to all of the share groups consuming from that topic.

Reading transactional records

Each consumer in a consumer group has its own isolation level which controls how it handles records which were produced in transactions. For a share group, the concept of isolation level applies to the entire group, not each consumer.

The isolation level of a share group is controlled by the group configuration group.share.isolation.level.

For the read_uncommitted isolation level, which is the default, the share group consumes all transactional and non-transactional records.

For the read_committed isolation level, the share group only consumes committed records. The share-partition leader itself is responsible for keeping track of the commit and abort markers and filtering out transactional records which have been aborted. So, the set of records which are eligible to become in-flight records are non-transactional records and committed transactional records only. The SPEO can only move up to the last stable offset.

In-flight records example

...

Here are some examples of how the cumulative state from the previous table would be represented in SHARE_CHECKPOINT records:

Cumulative stateSHARE_CHECKPOINT
SPSO=100, SPEO=100


Code Block
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 100,
  "EndOffset": 100,
  "States": []
}


SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 110,
  "EndOffset": 121,
  "States": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available),
      "DeliveryCount": 1
    },
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available),
      "DeliveryCount": 1
    },
    {
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 0 (Available),
      "DeliveryCount": 0
    },
    {
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1 (whatever it was when it was acknowledged)
    },
    {
      "BaseOffset": 120,
      "LastOffset": 120,
      "State": 0 (Available),
      "DeliveryCount": 0
    }
  ]
}


Note that the Acquired state is not recorded because it’s transient. As a result, an Acquired record with a delivery count of 1 is recorded as Available with a delivery count of 0. In the unlikely event a share-partition leader crash, memory of the in-flight delivery will be lost.

...

Here are the previous examples, showing the control records which record the cumulative state durably. Note that any SHARE_DELTA could be replaced with a SHARE_CHECKPOINT.

Operation

State changes

Cumulative state

Control records

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100


Code Block
SHARE_CHECKPOINT offset 130:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 110,
  "EndOffset": 110,
  "States": []
}


In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards


Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)


Acknowledge 100-109

SPSO=110

SPSO=110, SPEO=110


Code Block
SHARE_DELTA offset 131:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 130,
  "States": [
    {
      "BaseOffset": 100,
      "LastOffset": 109,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records


Fetch records 110-119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)


Release 110

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)


Code Block
SHARE_DELTA offset 132:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 131,
  "States": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}

Note that the SPEO in the control records is 111 at this point. All records after this are in their first delivery attempt so this is an acceptable situation.

Acknowledge 119

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged


Code Block
SHARE_DELTA offset 133:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 132,
  "States": [
    {
      "BaseOffset": 111,
      "LastOffset": 118,
      "State": 0 (Available),
      "DeliveryCount": 0
    },
    {
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 110, 120

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Lock timeout elapsed 111, 112

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
SHARE_DELTA offset 134:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 133,
  "States": [
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}


Acknowledge 113-118

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
SHARE_DELTA offset 135:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 134,
  "States": [
    {
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 111,112

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Acknowledge 110

SPSO=111

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
SHARE_DELTA offset 136:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 135,
  "States": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}


Acknowledge 111,112

SPSO=120

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)


Code Block
SHARE_DELTA offset 137:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "BackOffset": 136,
  "States": [
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}

or alternatively, taking a new checkpoint:

Code Block
SHARE_CHECKPOINT offset 137:
{
  "GroupId": "G1",
  "CheckpointEpoch": 2,
  "StartOffset": 120,
  "EndOffset": 120,
  "States": []
}

Note that the delivery of 120 has not been recorded yet because it is the first delivery attempt and it is safe to recover the SPEO back to offset 120 and repeat the attempt.

Public Interfaces

This KIP introduces extensive additions to the public interfaces.

...

The ShareFetch API is used by share group consumers to fetch acquired records from share-partition leaders. It is also possible to piggyback acknowledgements in this request to reduce the number of round trips.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "null if not provided or if it didn't change since the last fetch; the group identifier otherwise." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not provided or if it didn't change since the last fetch; the member id generated by the coordinator otherwise." },
    { "name": "AcquisitionTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
      "about": "-1 if it didn't chance since the last fetch; the maximum time in milliseconds that the fetched records are acquired for the consumer." },
    { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "SessionId", "type": "int32", "versions": "0+", "default": "0", "ignorable": true,
      "about": "The share session ID." },
    { "name": "SessionEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true,
      "about": "The share session epoch, which is used for ordering requests in a session." },
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The current leader epoch of the partition." },
        { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." },
      ]}
    { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "StartOffset", "type": "int64", "versions": "0+",
            "about": "Start offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "GapOffsets", "type": "[]int64", "versions": "0+",
            "about": "Array of offsets in this range which do not correspond to records."},
          { "name": "AcknowledgeType", "type": "int8", "versions": "0+", "default": "0",
            "about": "The type of acknowledgement - 0:Accept,1:Release,2:Reject."}
        ]}
    ]},
    { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false,
      "about": "In an incremental fetch request, the partitions to remove.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions indexes to forget." }
    ]}
  ]
}

...

Metric Name

Type

Group

Tags

Description

JMX Bean

group-count

Gauge

group-coordinator-metrics

protocol: share

The total number of share groups managed by group coordinator.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share 

rebalance (rebalance-rate and rebalance-count)

Meter

group-coordinator-metrics

protocol: share

The total number of share group rebalances count and rate.

kafka.server:type=group-coordinator-metrics,name=rebalance-rate,protocol=share 


kafka.server:type=group-coordinator-metrics,name=rebalance-count,protocol=share 

num-partitions

Gauge

group-coordinator-metrics

protocol: share

The number of share partitions managed by group coordinator. 

kafka.server:type=group-coordinator-metrics,name=num-partitions,protocol=share 

group-countGaugegroup-coordinator-metrics

protocol: share

state: {empty|stable|dead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={empty|stable|dead} 

offsetshare-commit acknowledgement (offsetshare-commitacknowledgement-rate and offsetshare-commitacknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

The total number of committed offsets acknowledged for share groups.

kafka.server:type=group-coordinator-metrics,name=offsetshare-commitacknowledgement-rate,protocol=share 


kafka.server:type=group-coordinator-metrics,name=offsetshare-commitacknowledgement-count,protocol=share 

record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

ack-type:{accept,release,reject} 


The number of records acknowledged per acknowledgement type.

kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-rate,protocol=share,ack-type={accept,release,reject} 


kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-count,protocol=share,ack-type={accept,release,reject} 

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

group-coordinator-metrics

protocol: share 

The time taken to load the share partitions.

kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg,protocol=share 


kafka.server:type=group-coordinator-metrics,name=partition-load-time-max,protocol=share  

Future Work

There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.

...