You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 23 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA:


Motivation

Messages stored in Kafka will expire and disappear silently based on retention time and size configuration. Consumers have no way to know they have missed messages.

I propose to log a info line after a message has been removed due to topic time/size retention settings, for a set of consumer groups specified on the topic configuration.

This could be implemented because the kafka brokers know the information needed for the task:

  • offset of the message that has been removed.
  • last offset consumed from a consumer group.

Public Interfaces

A new property at topic level should be created:

  • retention.notify.groups : comma separated list of groups that will be notified on offset expiration.

Proposed Changes

Currently the LogManager schedule the "kafka-delete-logs" thread, that will call the deleteLogs() method. Is possible to add into that method a check to list the consumed groups that didn't consume the messages removed.

The pseudo code is in the comment starting on line 19:

LogManager.scala deleteLogs()
  private def deleteLogs(): Unit = {
    var nextDelayMs = 0L
    try {
      def nextDeleteDelayMs: Long = {
        if (!logsToBeDeleted.isEmpty) {
          val (_, scheduleTimeMs) = logsToBeDeleted.peek()
          scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
        } else
          currentDefaultConfig.fileDeleteDelayMs
      }

      while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
        val (removedLog, _) = logsToBeDeleted.take()
        if (removedLog != null) {
          try {
            removedLog.delete()
            info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")

			/*

			//
			//  KIP-490: log when consumer groups lose a message because offset has been deleted
			//

			val consumerGroupsSubscribed : Seq[String] = getConsumerGroups(removedLog.topicPartition.topic() );
			val groupsToNotify : Seq[String] = consumerGroupsSubscribed intersect groupsTobeNotify // value get from topic config property 'retention.notify.groups'
			groupsToNotify.forEach( {
			  val lastCosumedOffsetGroup : Integer = getLastOffsetConsumed( _, removedLog.topicPartition);
              if(lastCosumedOffsetGroup < removedLog.nextOffsetMetadata) {
                info(s"message within offset range [ ${removedLog.nextOffsetMetadata} - ${lastCosumedOffsetGroup + 1} ] have been removed without being consumed by group $_ for topic partition ${removedLog.topicPartition}");
			  }
			})

			*/

          } catch {
            case e: KafkaStorageException =>
              error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
          }
        }
      }
    } catch {
      case e: Throwable =>
        error(s"Exception in kafka-delete-logs thread.", e)
    } finally {
      try {
        scheduler.schedule("kafka-delete-logs",
          deleteLogs _,
          delay = nextDelayMs,
          unit = TimeUnit.MILLISECONDS)
      } catch {
        case e: Throwable =>
          if (scheduler.isStarted) {
            // No errors should occur unless scheduler has been shutdown
            error(s"Failed to schedule next delete in kafka-delete-logs thread", e)
          }
      }
    }
  }

Compatibility, Deprecation, and Migration Plan

There is no impact on existing features.

Rejected Alternatives



  • No labels