Status
Current state: Under Discussion
Discussion thread:
JIRA:
Motivation
People use Kafka, among many reasons, because they need to be sure their messages are correctly processed by their applications. Classic configuration is to have 3 replica, and commit the offset of a message once it has been correctly processed. Developers use this configuration because it is important not to lose any messages. More important that not losing messages is to be notified when a message is lost without being consumed.
There are some situations where messages are lost silently:
- Message expires before being consumed due to topic retention time.
- Message expires before being consumed due to topic size limit.
I propose to log a warning after a message has been removed due to topic time/size retention settings, for a set of consumer groups specified on the topic configuration.
The kafka brokers know the information needed to achieve the goal:
- offset of the message that has been removed.
- last offset consumed from a consumer group.
Public Interfaces
A new topic level property would be created:
- retention.notify.groups : comma separated list of groups that will be notified on offset expiration.
Proposed Changes
The modifications introduced are in blue:
- The LogManager variable logsToBeDeleted contains logs and time schedule for deletion.
- The LogManager iterate on logsToBeDeleted will search for the logs to be deleted.
- The LogManager read the last offset consumed by all groups specified on "retention.notify.groups" property.
- The LogManager will remove the log.
- If the offset that has been removed is bigger than the last consumed offset for each group, log a line:
info(s"message with $offset offset $partition partition $topic topic $key key has been removed without being consumed by group $group")
Compatibility, Deprecation, and Migration Plan
There is no impact on existing features.