...
Status
Current state: Under DiscussionDiscussionf
Discussion thread: https://lists.apache.org/thread/9wdxthfsbm5xf01y4xvq6qtlg0gq96lq
...
For each share-partition, the share group adds some state management for the records being consumed. The starting offset of records which are eligible for consumption is known as the share-partition start offset (SSOSPSO), and the last offset of records which are eligible for consumption is known as the share-partition end offset (SEOSPEO). The records between starting at the SSO SPSO and up to the SEO SPEO are known as the in-flight records. So, a share-partition is essentially managing the consumption of the in-flight records.
The SEO SPEO is not necessarily always at the end of the topic-partition and it just advances freely as records are fetched beyond this point. The segment of the topic-partition between the SSO SPSO and the SEO SPEO is a sliding window that moves as records are consumed. The share-partition leader limits the distance between the SSO SPSO and the SEOSPEO. The upper bound is controlled by the broker configuration share.record.lock.partition.limit
. Unlike existing queuing systems, there’s no “maximum queue depth”, but there is a limit to the number of in-flight records at any point in time.
...
State | Description |
---|---|
Available | The record is available for a consumer |
Acquired | The record has been acquired for a specific consumer, with a time-limited acquisition lock |
Acknowledged | The record has been processed and acknowledged by a consumer |
Archived | The record is not available for a consumer |
All records before the SSO SPSO are in Archived state. All records after the SEO SPEO are in Available state, but not yet being delivered to consumers.
...
Code Block |
---|
+--------------+ | Available |<------------------+ +--------------+ | | | | acquired | - if (delivery count < share.delivery.attempt.limit) | for consumer | - released by consumer | (delivery count++) | - acquisition lock elapsed V | +--------------+ | | Acquired |-------------------+ +--------------+ | | | | accepted | - if (delivery count == share.delivery.attempt.limit) | by consumer | - released by consumer | | - acquisition lock elapsed V | OR +--------------+ | - rejected by consumer as unprocessable | Acknowledged | | +--------------+ | | | | SSOSPSO moves | | past record | | | V | +--------------+ | | Archived |<------------------+ +--------------+ |
When records are fetched for a consumer, the share-partition leader starts at the SSO SPSO and finds Available records. For each record it finds, it moves it into Acquired state, bumps its delivery count and adds it to a batch of acquired records to return to the consumer. The consumer then processes the records and acknowledges their consumption. The delivery attempt completes successfully and the records move into Acknowledged state.
Alternatively, if the consumer cannot process a record or its acquisition lock elapses, the delivery attempt completes unsuccessfully and the record’s next state depends on the delivery count. If the delivery count has reached the cluster’s share delivery attempt limit (5 by default), the record moves into Archived state and is not eligible for additional delivery attempts. If the delivery count has not reached the limit, the record moves back into Available state and can be delivered again.
This means that the delivery behavior is at-least-once.
Ordering
Share groups focus on primarily on sharing to allow consumers to be scaled independently of partitions. The records in a share-partition can be delivered out of order to a consumer, in particular when redeliveries occur.
...
The records returned in a batch for particular share-partition are guaranteed to be in order of increasing offset. There are no guarantees about the ordering of offsets between different batches.
Managing the
...
SPSO and
...
SPEO
The consumer group concepts of seeking and position do not apply to share groups. The SSO SPSO for each share-partition can be initialized for an empty share group and the SEO SPEO naturally moves forwards as records are consumed.
When a topic subscription is added to a share group for the first time, the SSO SPSO is initialized for each share-partition. By default, the SSO SPSO for each share-partition is initialized to the latest offset for the corresponding topic-partitions.
Alternatively, there is an administrative action available using either AdminClient.alterShareGroupOffsets
or the kafka-share-groups.sh
tool to reset the SSO SPSO for an empty share group with no active members. This can be used to “reset” a share group to the start of a topic, a particular timestamp or the end of a topic. It can also be used to initialize the share group to the start of a topic. Resetting the SSO SPSO discards all of the in-flight record state and delivery counts.
...
If the number of partitions is increased for a topic with a subscription in a share group, the SSO SPSO for the newly created share-partitions is initialized to 0 (which is of course both the earliest and latest offset for an empty topic-partition). This means there is no doubt about what happens when the number of partitions is increasedis increased.
If the SPSO is reset to an offset that has been tiered to remote storage (KIP-405: Kafka Tiered Storage), there will be a performance impact just as for existing consumers fetching records from remote storage.
In-flight records example
...
Code Block |
---|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+ | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | ... | <- offset | Archv | Archv | Acqrd | Avail | Acqrd | Acked | Archv | Avail | Avail | Avail | Avail | <- state | | | 1 | 2 | 1 | | | | | | | <- delivery count +-------+-------+---^---+-------+-------+-------+-------+-------+-------+---^---+-------+ | | +-- Share-partition start offset (SSO) SPSO) +-- Share-partition end offset (SEOSPEO) |
The share group is currently managing the consumption of the in-flight records, which have offsets 2 to 8 inclusive.
...
The cluster records this information durably. In this example, the durable state contains the SSO SPSO position, the non-zero delivery count for offset 3, the Acknowledged state of offset 5, and the Archived state of offset 6.
...
The share-partition leader is responsible for recording the durable state for the share-partitions it leads. For each share-partition, we need to be able to recover:
The Share-Partition Start Offset (SSOSPSO)
The state of the in-flight records
The delivery counts of records whose delivery failed
...
Operation | State changes | Cumulative state |
---|---|---|
Starting state of topic-partition with latest offset 100 | SSOSPSO=100, SEOSPEO=100SSO | SPSO=100, SEOSPEO=100 |
In the batched case with successful processing, there’s a state change per batch to move the SSO SPSO forwards | ||
Fetch records 100-109 | SEOSPEO=110, records 100-109 (acquired, delivery count 1) | SSOSPSO=100, SEOSPEO=110, records 100-109 (acquired, delivery count 1) |
Acknowledge 100-109 | SSOSPSO=110 | SSOSPSO=110, SEOSPEO=110 |
With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records | ||
Fetch records 110-119 | SEOSPEO=120, records 110-119 (acquired, delivery count 1) | SSOSPSO=110, SEOSPEO=120, records 110-119 (acquired, delivery count 1) |
Release 110 | record 110 (available, delivery count 1) | SSOSPSO=110, SEOSPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1) |
Acknowledge 119 | record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged | SSOSPSO=110, SEOSPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged |
Fetch records 110, 120 | SEOSPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1) | SSOSPSO=110, SEOSPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1) |
Lock timeout elapsed 111, 112 | records 111-112 (available, delivery count 1) | SSOSPSO=110, SEOSPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1) |
Acknowledge 113-118 | records 113-118 acknowledged | SSOSPSO=110, SEOSPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1) |
Fetch records 111,112 | records 111-112 (acquired, delivery count 2) | SSOSPSO=110, SEOSPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1) |
Acknowledge 110 | SSOSPSO=111 | SSOSPSO=111, SEOSPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1) |
Acknowledge 111,112 | SSOSPSO=120 | SSOSPSO=120, SEOSPEO=121, record 120 (acquired, delivery count 1) |
...
Configuration | Description | Values |
---|---|---|
share.group.enable | Whether to enable share groups on the broker. | Default false while the feature is being developed. Will become true in a future release. |
share.delivery.count.limit | The maximum number of delivery attempts for a record delivered to a share group. | Default 5, minimum 2, maximum 10 |
share.record.lock.duration.ms | Share-group record acquisition lock duration in milliseconds. | Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds) |
share.record.lock.duration.max.ms | Share-group record acquisition lock maximum duration in milliseconds. | Default 60000 (60 seconds), minimum 1000 (1 second), maximum 3600000 (1 hour) |
share.record.lock.partition.limit | Share-group record lock limit per share-partition. | Default 200, minimum 100, maximum 10000 |
...
Configuration | Description | Values |
---|---|---|
group.type | Type of the group: "consumer" or "share" . | Default "consumer" |
record.lock.duration.ms | Record acquisition lock duration in milliseconds. | null, which uses the cluster configuration share.record.lock.duration.ms , minimum 1000, maximum limited by the cluster configuration share.record.lock.duration.max.ms |
Kafka protocol changes
Further details to follow as the design progresses.
...
A “browsing” consumer which does not modify the share group state or take acquisition locks could be supported which needs lesser permission ( DESCRIBE
) on the group than a proper consumer ( READ
). This is a little more complicated because it needs to have a position independent of the SSO SPSO so that it can traverse along the queue.
...