THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
private def deleteLogs(): Unit = { var nextDelayMs = 0L try { def nextDeleteDelayMs: Long = { if (!logsToBeDeleted.isEmpty) { val (_, scheduleTimeMs) = logsToBeDeleted.peek() scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds() } else currentDefaultConfig.fileDeleteDelayMs } while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) { val (removedLog, _) = logsToBeDeleted.take() if (removedLog != null) { try { removedLog.delete() info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.") /* // // KIP-490: log offset not consumed by consumer group specified on retention.notify.groups // //val consumerGroupsSubscribed : consumerGroupsSubscribedSeq[String] = getConsumerGroups(removedLog.topicPartition.topic() ); // consumerGroupsSubscribedval groupsToNotify : Seq[String] = consumerGroupsSubscribed intersect groupsTobeNotify groupsToNotify.forEach( _ ) { //store the tuple (offset, topic, partition) val lastCosumedOffsetGroup : lastOffsetGroup(Integer, String, String) = getLastOffsetConsumed( _, removedLog.topicPartition); // if(lastOffsetGrouplastCosumedOffsetGroup >< removedLog.logStartOffset___LAST_OFFSET) { // info(s"message withwithin $offset offset $partitionrange partition $topic topic $key key has been removed without being[ ${lastCosumedOffsetGroup._1 + 1} - $removedLog.__LAST_OFFSET ] has not been consumed by group $group") $_ for topic ${removedLog.Topic} and partition ${removed.partition}"); // } // }) */ } catch { case e: KafkaStorageException => error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e) } } } } catch { case e: Throwable => error(s"Exception in kafka-delete-logs thread.", e) } finally { try { scheduler.schedule("kafka-delete-logs", deleteLogs _, delay = nextDelayMs, unit = TimeUnit.MILLISECONDS) } catch { case e: Throwable => if (scheduler.isStarted) { // No errors should occur unless scheduler has been shutdown error(s"Failed to schedule next delete in kafka-delete-logs thread", e) } } } } |
...