THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- The LogManager variable logsToBeDeleted contains logs and time schedule for deletion.
- cleaner: LogCleaner. Only used for 'compact' cleanup.policy.
- LogManager.startup() will run schedulers:
- kafka-log-retention → cleanupLogs() → cleaner (only used for when cleanup.policy is 'compact').
- kafka-delete-logs → deleteLogs()
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
private def deleteLogs(): Unit = {
var nextDelayMs = 0L
try {
def nextDeleteDelayMs: Long = {
if (!logsToBeDeleted.isEmpty) {
val (_, scheduleTimeMs) = logsToBeDeleted.peek()
scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
} else
currentDefaultConfig.fileDeleteDelayMs
}
while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
val (removedLog, _) = logsToBeDeleted.take()
if (removedLog != null) {
try {
removedLog.delete()
info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
//
// KIP-491
//
// for each group in 'retention.notify.group' topic property get the last consumed offset.
// if (lastOffset < Log.firstOffset) {
// info(s"message with $offset offset $partition partition $topic topic $key key has been removed without being consumed by group $group")
// }
} catch {
case e: KafkaStorageException =>
error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
}
}
}
} catch {
case e: Throwable =>
error(s"Exception in kafka-delete-logs thread.", e)
} finally {
try {
scheduler.schedule("kafka-delete-logs",
deleteLogs _,
delay = nextDelayMs,
unit = TimeUnit.MILLISECONDS)
} catch {
case e: Throwable =>
if (scheduler.isStarted) {
// No errors should occur unless scheduler has been shutdown
error(s"Failed to schedule next delete in kafka-delete-logs thread", e)
}
}
}
} |
- The LogManager iterate on logsToBeDeleted will search for the logs to be deleted.
- The LogManager read the last offset consumed by all groups specified on "retention.notify.groups" property.
- The LogManager will remove the log.
- If the offset that has been removed is bigger than the last consumed offset for each group, log a line:
info(s"message with $offset offset $partition partition $topic topic $key key has been removed without being consumed by group $group")
...