Table of Contents |
---|
Status
Current state: "Draft"
Discussion thread: here
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The offset of a topic partition within a consumer group typically expires when offsets.retention.minutes
elapses since the last offset commit to that partition. KAFKA-4682 reports an issue related to this offset expiration, where committed offsets are removed even when there are still active, but rarely committing, consumers in the (Stable
) group.
...
There are workarounds to this issue and some of them are described in KAFKA-4682, but they come with their own limitations and drawbacks.
Public Interfaces
This is the current OffsetCommit
protocol:
...
Code Block |
---|
OffsetCommit Request (Version: 4) => group_id group_generation_id member_id [topics] group_id => STRING group_generation_id => INT32 member_id => STRING topics => topic [partitions] topic => STRING partitions => partition offset metadata partition => INT32 offset => INT64 metadata => NULLABLE_STRING OffsetCommit Response (Version: 4) => throttle_time_ms [responses] throttle_time_ms => INT32 responses => topic [partition_responses] topic => STRING partition_responses => partition error_code partition => INT32 error_code => INT16 |
Proposed Changes
A more viable solution for KAFKA-4682 can be achieved by changing how group offset expiration works: preserve committed offsets as long as the group is active (has consumers). The expiration clock should start ticking the moment all group members are gone and the group transitions into an Empty
state. This expiration semantics implies that there is no longer a need to enforce individual offset retention times and keep individual expiration timestamps for each topic partition in the group. This is because all committed offsets in the group will expire at the same time. As a result, the expireTimestamp
field will be removed from the offset metadata message.
...
The default retention time for group offsets can be customized through the existing offsets.retention.minutes
broker configuration. If, in the future, a need arises for enforcing a per group retention configuration, it can be implemented via a separate KIP.
Compatibility, Deprecation, and Migration Plan
As mentioned above, as a result of this new semantics, all offsets in a group expire at the same time. The broker config offsets.retention.minutes
determines when they expire (after the group becomes Empty
). Older clients that prefer to honor their individual retention overrides (through OffsetCommitRequest
) will need to set offsets.retention.minutes
to the maximum of those overrides and the value of offsets.retention.minutes
. This simply guarantees that the offsets will not expire any earlier now with the new semantics (because the expiration clock for a partition now starts no earlier - and perhaps later - than before). This also simplifies handling of clients that use different versions of the API.
For old clients that send an OffsetCommitRequest
of version 3 or earlier, any custom partition-level retention will be ignored, and instead the broker level retention of offsets.retention.minutes
will apply to determine expiry of all offsets within the group. As discussed above, a high enough offsets.retention.minutes
has to be used to guarantee the previous retentions are not negatively impacted. Group offsets expire together.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.