THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- retention.notify.groups : comma separated list of groups that will be notified on offset expiration.
Proposed Changes
The modifications introduced are in blue:
...
Currently the LogManager schedule the "kafka-delete-logs", that will call the deleteLogs() method. Is possible to add into that method a a check to list the consumed groups that didnt consume the offset.
The pseudo code is in the comment in the middle below:
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
private def deleteLogs(): Unit = {
var nextDelayMs = 0L
try {
def nextDeleteDelayMs: Long = {
if (!logsToBeDeleted.isEmpty) {
val (_, scheduleTimeMs) = logsToBeDeleted.peek()
scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
} else
currentDefaultConfig.fileDeleteDelayMs
}
while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
val (removedLog, _) = logsToBeDeleted.take()
if (removedLog != null) {
try {
removedLog.delete()
info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
//
// KIP-490: log offset not consumed by consumer group specified on retention.notify.groups
//
// consumerGroupsSubscribed = getConsumerGroups(removedLog.topicPartition)
// consumerGroupsSubscribed.forEach( _ ) {
// lastOffsetGroup = getLastOffsetConsumed( _, removedLog.topicPartition)
// if(lastOffsetGroup > removedLog.logStartOffset) {
// info(s"message with $offset offset $partition partition $topic topic $key key has been removed without being consumed by group $group")
// }
// }
} catch {
case e: KafkaStorageException =>
error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
}
}
}
} catch {
case e: Throwable =>
error(s"Exception in kafka-delete-logs thread.", e)
} finally {
try {
scheduler.schedule("kafka-delete-logs",
deleteLogs _,
delay = nextDelayMs,
unit = TimeUnit.MILLISECONDS)
} catch {
case e: Throwable =>
if (scheduler.isStarted) {
// No errors should occur unless scheduler has been shutdown
error(s"Failed to schedule next delete in kafka-delete-logs thread", e)
}
}
}
} |
...
Compatibility, Deprecation, and Migration Plan
...