Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • retention.notify.groups : comma separated list of groups that will be notified on offset expiration.

Proposed Changes

The modifications introduced are in blue:

...

Currently the LogManager schedule the "kafka-delete-logs", that will call the deleteLogs() method. Is possible to add into that method a a check to list the consumed groups that didnt consume the offset.


The pseudo code is in the comment in the middle below:

...


Code Block
languagescala
titleLogManager.scala deleteLogs()
linenumberstrue
  private def deleteLogs(): Unit = {
    var nextDelayMs = 0L
    try {
      def nextDeleteDelayMs: Long = {
        if (!logsToBeDeleted.isEmpty) {
          val (_, scheduleTimeMs) = logsToBeDeleted.peek()
          scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
        } else
          currentDefaultConfig.fileDeleteDelayMs
      }

      while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
        val (removedLog, _) = logsToBeDeleted.take()
        if (removedLog != null) {
          try {
            removedLog.delete()
            info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")

			// 
			//  KIP-490: log offset not consumed by consumer group specified on retention.notify.groups
			//
			//  consumerGroupsSubscribed = getConsumerGroups(removedLog.topicPartition)
			//  consumerGroupsSubscribed.forEach( _ ) {
			//    lastOffsetGroup = getLastOffsetConsumed( _, removedLog.topicPartition)
            //    if(lastOffsetGroup > removedLog.logStartOffset) {
            //      info(s"message with $offset offset $partition partition $topic topic $key key has been removed without being consumed by group $group")
			//	  }
			//  }


          } catch {
            case e: KafkaStorageException =>
              error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
          }
        }
      }
    } catch {
      case e: Throwable =>
        error(s"Exception in kafka-delete-logs thread.", e)
    } finally {
      try {
        scheduler.schedule("kafka-delete-logs",
          deleteLogs _,
          delay = nextDelayMs,
          unit = TimeUnit.MILLISECONDS)
      } catch {
        case e: Throwable =>
          if (scheduler.isStarted) {
            // No errors should occur unless scheduler has been shutdown
            error(s"Failed to schedule next delete in kafka-delete-logs thread", e)
          }
      }
    }
  }

...

Compatibility, Deprecation, and Migration Plan

...