You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 15 Next »

Status

Current state: Under Discussion

Discussion thread:

JIRA:


Motivation

People use Kafka, among many reasons, because they need to be sure their messages are correctly processed by their applications. One typical configuration is to have 3 replica, and commit the offset of a message after it has been correctly processed. Developers use this configuration because it is important not to lose any messages. But even more important that not losing messages is to be notified when a message is lost without being consumed.

There are some situations where messages are lost silently:

  • Message expires before being consumed due to topic retention time.
  • Message expires before being consumed due to topic size limit.

I propose to log a warning after a message has been removed due to topic time/size retention settings, for a set of consumer groups specified on the topic configuration.

I believe this could be implemented because the kafka brokers know the information needed for the task:

  • offset of the message that has been removed.
  • last offset consumed from a consumer group.

Public Interfaces

A new property at topic level should be created:

  • retention.notify.groups : comma separated list of groups that will be notified on offset expiration.

Proposed Changes

Currently the LogManager schedule the "kafka-delete-logs" thread, that will call the deleteLogs() method. Is possible to add into that method a check to list the consumed groups that didn't consume the messages being removed.

The pseudo code is in the comment in the middle below:

LogManager.scala deleteLogs()
  private def deleteLogs(): Unit = {
    var nextDelayMs = 0L
    try {
      def nextDeleteDelayMs: Long = {
        if (!logsToBeDeleted.isEmpty) {
          val (_, scheduleTimeMs) = logsToBeDeleted.peek()
          scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
        } else
          currentDefaultConfig.fileDeleteDelayMs
      }

      while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
        val (removedLog, _) = logsToBeDeleted.take()
        if (removedLog != null) {
          try {
            removedLog.delete()
            info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")

			/*

			//
			//  KIP-490: log offset not consumed by consumer group specified on retention.notify.groups
			//

			val consumerGroupsSubscribed : Seq[String] = getConsumerGroups(removedLog.topicPartition.topic() );
			val groupsToNotify : Seq[String] = consumerGroupsSubscribed intersect groupsTobeNotify // value get from topic config property 'retention.notify.groups'
			groupsToNotify.forEach( {
			  val lastCosumedOffsetGroup : Integer = getLastOffsetConsumed( _, removedLog.topicPartition);
              if(lastCosumedOffsetGroup < removedLog.nextOffsetMetadata) {
                info(s"message within offset range [ ${removedLog.nextOffsetMetadata} - ${lastCosumedOffsetGroup + 1} ] have not been consumed by group $_ for topic partition ${removedLog.topicPartition}");
			  }
			})

			*/

          } catch {
            case e: KafkaStorageException =>
              error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
          }
        }
      }
    } catch {
      case e: Throwable =>
        error(s"Exception in kafka-delete-logs thread.", e)
    } finally {
      try {
        scheduler.schedule("kafka-delete-logs",
          deleteLogs _,
          delay = nextDelayMs,
          unit = TimeUnit.MILLISECONDS)
      } catch {
        case e: Throwable =>
          if (scheduler.isStarted) {
            // No errors should occur unless scheduler has been shutdown
            error(s"Failed to schedule next delete in kafka-delete-logs thread", e)
          }
      }
    }
  }

Compatibility, Deprecation, and Migration Plan

There is no impact on existing features.

Rejected Alternatives


  • No labels