Status
Current state: Under Discussion
Discussion thread:
JIRA:
Motivation
Messages stored in Kafka will expire and disappear silently based on retention time and size configuration. Consumers have no way to know they have missed messages.
I propose to log a info line after a message has been removed due to topic time/size retention settings, for a set of consumer groups specified on the topic configuration.
This could be implemented because the kafka brokers know the information needed for the task:
- offset of the message that has been removed.
- last offset consumed from a consumer group.
Public Interfaces
A new property at topic level should be created:
- retention.notify.groups : comma separated list of groups that will be notified on offset expiration.
Proposed Changes
Currently the LogManager schedule the "kafka-delete-logs" thread, that will call the deleteLogs() method. Is possible to add into that method a check to list the consumed groups that didn't consume the messages removed.
The pseudo code is in the comment starting on line 19:
private def deleteLogs(): Unit = { var nextDelayMs = 0L try { def nextDeleteDelayMs: Long = { if (!logsToBeDeleted.isEmpty) { val (_, scheduleTimeMs) = logsToBeDeleted.peek() scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds() } else currentDefaultConfig.fileDeleteDelayMs } while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) { val (removedLog, _) = logsToBeDeleted.take() if (removedLog != null) { try { removedLog.delete() info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.") /* // // KIP-490: log when consumer groups lose a message because offset has expired // val consumerGroupsSubscribed : Seq[String] = getConsumerGroups(removedLog.topicPartition.topic() ); val groupsToNotify : Seq[String] = consumerGroupsSubscribed intersect groupsTobeNotify // value get from topic config property 'retention.notify.groups' groupsToNotify.forEach( { val lastCosumedOffsetGroup : Integer = getLastOffsetConsumed( _, removedLog.topicPartition); if(lastCosumedOffsetGroup < removedLog.nextOffsetMetadata) { info(s"message within offset range [ ${removedLog.nextOffsetMetadata} - ${lastCosumedOffsetGroup + 1} ] have been removed without being consumed by group $_ for topic partition ${removedLog.topicPartition}"); } }) */ } catch { case e: KafkaStorageException => error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e) } } } } catch { case e: Throwable => error(s"Exception in kafka-delete-logs thread.", e) } finally { try { scheduler.schedule("kafka-delete-logs", deleteLogs _, delay = nextDelayMs, unit = TimeUnit.MILLISECONDS) } catch { case e: Throwable => if (scheduler.isStarted) { // No errors should occur unless scheduler has been shutdown error(s"Failed to schedule next delete in kafka-delete-logs thread", e) } } } }
Compatibility, Deprecation, and Migration Plan
There is no impact on existing features.