...
Note that because the share groups are all consuming from the same log, the retention behavior for a topic applies to all of the share groups consuming from that topic.
Reading transactional records
Each consumer in a consumer group has its own isolation level which controls how it handles records which were produced in transactions. For a share group, the concept of isolation level applies to the entire group, not each consumer.
The isolation level of a share group is controlled by the group configuration group.share.isolation.level
.
For the read_uncommitted
isolation level, which is the default, the share group consumes all transactional and non-transactional records.
For the read_committed
isolation level, the share group only consumes committed records. The share-partition leader itself is responsible for keeping track of the commit and abort markers and filtering out transactional records which have been aborted. So, the set of records which are eligible to become in-flight records are non-transactional records and committed transactional records only. The SPEO can only move up to the last stable offset.
In-flight records example
...
Here are some examples of how the cumulative state from the previous table would be represented in SHARE_CHECKPOINT records:
Cumulative state | SHARE_CHECKPOINT | ||
---|---|---|---|
SPSO=100, SPEO=100 |
| ||
SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1) |
|
Note that the Acquired state is not recorded because it’s transient. As a result, an Acquired record with a delivery count of 1 is recorded as Available with a delivery count of 0. In the unlikely event a share-partition leader crash, memory of the in-flight delivery will be lost.
...
Here are the previous examples, showing the control records which record the cumulative state durably. Note that any SHARE_DELTA could be replaced with a SHARE_CHECKPOINT.
Operation | State changes | Cumulative state | Control records | ||||
---|---|---|---|---|---|---|---|
Starting state of topic-partition with latest offset 100 | SPSO=100, SPEO=100 | SPSO=100, SPEO=100 |
| ||||
In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards | |||||||
Fetch records 100-109 | SPEO=110, records 100-109 (acquired, delivery count 1) | SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1) | |||||
Acknowledge 100-109 | SPSO=110 | SPSO=110, SPEO=110 |
| ||||
With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records | |||||||
Fetch records 110-119 | SPEO=120, records 110-119 (acquired, delivery count 1) | SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1) | |||||
Release 110 | record 110 (available, delivery count 1) | SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1) |
Note that the SPEO in the control records is 111 at this point. All records after this are in their first delivery attempt so this is an acceptable situation. | ||||
Acknowledge 119 | record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged | SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged |
| ||||
Fetch records 110, 120 | SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1) | SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1) | |||||
Lock timeout elapsed 111, 112 | records 111-112 (available, delivery count 1) | SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1) |
| ||||
Acknowledge 113-118 | records 113-118 acknowledged | SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1) |
| ||||
Fetch records 111,112 | records 111-112 (acquired, delivery count 2) | SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1) | |||||
Acknowledge 110 | SPSO=111 | SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1) |
| ||||
Acknowledge 111,112 | SPSO=120 | SPSO=120, SPEO=121, record 120 (acquired, delivery count 1) |
or alternatively, taking a new checkpoint:
Note that the delivery of 120 has not been recorded yet because it is the first delivery attempt and it is safe to recover the SPEO back to offset 120 and repeat the attempt. |
Public Interfaces
This KIP introduces extensive additions to the public interfaces.
...
The ShareFetch API is used by share group consumers to fetch acquired records from share-partition leaders. It is also possible to piggyback acknowledgements in this request to reduce the number of round trips.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "ShareFetchRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId", "about": "null if not provided or if it didn't change since the last fetch; the group identifier otherwise." }, { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not provided or if it didn't change since the last fetch; the member id generated by the coordinator otherwise." }, { "name": "AcquisitionTimeoutMs", "type": "int32", "versions": "0+", "default": -1, "about": "-1 if it didn't chance since the last fetch; the maximum time in milliseconds that the fetched records are acquired for the consumer." }, { "name": "MaxWaitMs", "type": "int32", "versions": "0+", "about": "The maximum time in milliseconds to wait for the response." }, { "name": "MinBytes", "type": "int32", "versions": "0+", "about": "The minimum bytes to accumulate in the response." }, { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true, "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, { "name": "SessionId", "type": "int32", "versions": "0+", "default": "0", "ignorable": true, "about": "The share session ID." }, { "name": "SessionEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true, "about": "The share session epoch, which is used for ordering requests in a session." }, { "name": "Topics", "type": "[]FetchTopic", "versions": "0+", "about": "The topics to fetch.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"}, { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+", "about": "The partitions to fetch.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true, "about": "The current leader epoch of the partition." }, { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+", "about": "The maximum bytes to fetch from this partition. See KIP-74 for cases where this limit may not be honored." }, ]} { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+", "about": "Record batches to acknowledge.", "fields": [ { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "Start offset of batch of records to acknowledge."}, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "Last offset (inclusive) of batch of records to acknowledge."}, { "name": "GapOffsets", "type": "[]int64", "versions": "0+", "about": "Array of offsets in this range which do not correspond to records."}, { "name": "AcknowledgeType", "type": "int8", "versions": "0+", "default": "0", "about": "The type of acknowledgement - 0:Accept,1:Release,2:Reject."} ]} ]}, { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false, "about": "In an incremental fetch request, the partitions to remove.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"}, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions indexes to forget." } ]} ] } |
...
Metric Name | Type | Group | Tags | Description | JMX Bean |
---|---|---|---|---|---|
group-count | Gauge | group-coordinator-metrics |
| The total number of share groups managed by group coordinator. |
|
rebalance (rebalance-rate and rebalance-count) | Meter | group-coordinator-metrics |
| The total number of share group rebalances count and rate. |
|
num-partitions | Gauge | group-coordinator-metrics |
| The number of share partitions managed by group coordinator. |
|
group-count | Gauge | group-coordinator-metrics |
| The number of share groups in respective state. | kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={empty|stable|dead} |
offsetshare-commit acknowledgement (offsetshare-commitacknowledgement-rate and offsetshare-commitacknowledgement-count) | Meter | group-coordinator-metrics |
| The total number of committed offsets acknowledged for share groups. |
|
record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count) | Meter | group-coordinator-metrics |
| The number of records acknowledged per acknowledgement type. |
|
partition-load-time (partition-load-time-avg and partition-load-time-max) | Meter | group-coordinator-metrics |
| The time taken to load the share partitions. |
|
Future Work
There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.
...