Table of Contents |
---|
Status
Current state: Accepted
Discussion thread: here
Discussion Vote thread: here
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Released: 2.1.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
The following table summarizes this:
Version of OffsetCommit | Commit Timestamp | Offset Retention | Expiration Timestamp |
---|---|---|---|
0 | [ZooKeeper based offset management - out of scope] | ||
1 - no explicit commit timestamp | Current timestamp | Broker's offsets.retention.minutes | Commit Timestamp + Offset Retention |
1 - with explicit commit timestamp | Partition-specific timestamp in the request | Broker’s offsets.retention.minutes | |
2, 3 | Current timestamp | Request’s retention_time |
For versions 1-3, once the expiration timestamp is reached, the offset is removed from the offset cache (during the next cleanup) regardless of the group state. KAFKA-4682 reports an issue related to this offset expiration, where committed offsets are removed even when there are still active, but rarely committing, consumers in the (Stable
) group.
...
This proposed change has an impact on the existing offset commit value schema. There is an expire_timestamp
field in this schema that, as a result of expiring all group offsets at the same time, would become redundant (as it would repeat the same value for each offset in the group).
Code Block |
---|
Offset Commit Value Schema (Version: 1) => offset => Long metadata => String commit_timestamp => Long expire_timestamp => Long |
...
To make up for the per-offset expiration timestamp we lose in the new version of offset commit value schema, a new field is added in the group metadata value schema that indicates when the group last transitioned to Empty
state, that defaults to Long.MaxValue
changed state.
Code Block |
---|
Group Metadata Value Schema (Version: 1) => protocol_type => String generation => Int protocol => String leader => String members => [member] ... |
...
Code Block |
---|
Group Metadata Value Schema (Version: 2) => protocol_type => String generation => Int protocol => String leader => String emptycurrent_state_timestamp => Long members => [member] ... |
...
The expiration time of offsets in a group will be when the group becomes Empty
plus retention time of offsets.retention.minutes
(assuming during that time the group does not become active again). Whenever the group transitions to Empty
state, emptycurrent_state_timestamp
resets to the value of current timestamp. Then, during any scheduled offset cleanup task, if "current timestamp" minus emptycurrent_state_timestamp
is greater than or equal to broker's offsets.retention.minutes
for any group, all offsets in that group will be removed and the group will transition to Dead
state.
Note that consumers may rejoin the group while the group is in Empty
state. As soon as that happens and , the group transitions out of Empty
state changes, empty_state_timestamp
resets to the default Long.MaxValue
which practically disables the expiration timer, and that practically disables offset expiration. This is a breakdown of group states and how the offsets expiration works in those states:
Stable
: Group offsets will not expire in this state . The fieldempty_state_timestamp
isLong.MaxValue
.(group state ≠Empty
)PreparingRebalance
: Group offsets will not expire in this state. The fieldempty_state_timestamp
isLong.MaxValue
.state (group state ≠Empty
)CompletingRebalance
: Group offsets will not expire in this state. The fieldempty_state_timestamp
isLong.MaxValue
.state (group state ≠Empty
)Empty
: The fieldemptycurrent_state_timestamp
is set to when group last transitioned to this state. If the group stays in this foroffsets.retention.minutes
, the following offset cleanup scheduled task will remove all offsets in the group (as explained above).Dead
: Group offsets have already expired in this stateexpired (group deletion); or the group is unloaded from the coordinator cache (coordinator change). No offset expiration action required.
The default retention time for group offsets can be customized through the existing offsets.retention.minutes
broker configuration. If, in the future, a need arises for enforcing a per group retention configuration, it can be implemented via a separate KIP.
There are also a couple particular cases that need to be addressed with this new semantics:
...
Unfortunately, there is no notification mechanism in place for member subscription change within a group. Therefore, a poll mechanism can be implemented to run at specific intervals and check whether group subscription has deviated from what is stored in the cache. One place to do this is the repeating offset cleanup scheduled jobs, which by default run every 10 minutes, making them a good choice as the group subscription check will not be executed very frequently. At every execution of this job we collect a list of all topic partitions the group is consuming from (this can be calculated based on the the data in each group member’s metadata), and cross reference it with the stored offsets for the group. If there are partitions the group has offset for but no longer consumes from, and offsets.retention.minutes
has passed since their last commit timestamp, the corresponding offsets will be removed from the offset cache.
Note: This feature was not implemented as part of the KIP implementation and was intentionally left out for future implementation.
Standalone (Simple) Consumer
...
The following table summarize how the new offset expiration semantics would be implemented.
Group State | Additional Check in Offset Cleanup Job | Action if Check Holds |
---|---|---|
=
|
|
current timestamp - |
|
| |
≠ Empty | (Non-subscribed partitions = partitions group has offset for - partitions group is consuming from) ∀partition ∈ non-subscribed partitions:
| Remove offset of partition |
= Empty | current timestamp - partition's commit_timestamp ≥ broker's offsets.retention.minutes | Remove offset of partition |
Note that there are different valid protocolType
values, such as consumer
and stream
, and the above semantics applies to them all.
Another Related Change
When group names are automatically generated by the console consumer they are very likely not to be reused. Therefore, it makes sense to skip storing offsets for them by default to avoid one of the top factors for offset cache size growth. The proposal is to disable auto offset commit by default in this situation. Implementing this change would become more critical once
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
...