You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Next »

Status

Current stateDrafting

Discussion thread: Not ready yet

JIRA: KAFKA-7904

Motivation

Today, a topic partition may be categorized as:

(1) Fully in sync (inSyncReplicas == allReplicasMap)

(2) UnderReplicated

(3) UnderMinIsr

(4) Offline


(3) and (4) are failure scenarios in which clients will face unavailability (producers with acks=ALL will fail to produce if ISR count is under the configured "min.insync.replicas" count).

(2) Under-replicated partitions occur whenever the inSyncReplicas set is not equal to the allReplicasMap, which can happen when we have:

  • Repartitioning
  • Broker restarts
  • Transient network issues
  • Broker failures


The current categorization of topic partitions has a gap as an UnderReplicatedPartition does not tell operators if the reduced ISR set is intentional (repartitioning/restarts) or if there may be something wrong such as a broker has completely failed. This makes it hard for operators as setting an alert for UnderReplicatedPartitions may not be effective as it may be too noisy, and increasing the # of samples needed to trigger the alert increases the time to detect failures.

This KIP aims to fill this gap by proposing a new categorization of partitions: AtMinIsr, which consists of partitions that only have the minimum number of insync replicas remaining in the ISR set (as configured by "min.insync.replicas").

If a partition is "AtMinIsr", then it suggests something severe has happened, but more importantly that one more failure can result in unavailability so some sort of action should be taken (ex. repartitioning).

Examples

Example 1:

1 partition

minIsrCount=2

ISR set = [0,1,2]


1. Broker 0 fails

  • ISR set = [1,2]
  • partition is UnderReplicatedPartition and AtMinIsr

2. Broker 1 fails

  • ISR set = [2]
  • partition is UnderReplicatedPartition and UnderMinIsr

In this example, AtMinIsr triggers when UnderReplicatedPartition triggers and tells us that 1 more failure will cause producers with ack=ALL to be unavailable.


Example 2: 

1 partition

minIsrCount=1

ISR set = [0,1,2]


1. Broker 0 fails

  • ISR set = [1,2]
  • partition is UnderReplicatedPartition

2. Broker 1 fails

  • ISR set = [2]
  • partition is UnderReplicatedPartition and AtMinIsr

3. Broker 2 fails

  • ISR set = []
  • partition is OfflinePartition

In this example, AtMinIsr triggers when there is only 1 insync replica remaining, and tells us that 1 more failure will cause the partition to go completely offline!

AtMinIsr Values + Possible Explanations

1. AtMinIsr is zero

Everything is fine, and business as usual. Nothing to see here.

2. AtMinIsr is consistently greater than zero for a prolonged period of time

Broker(s) may have failed so this could warrant an alert for an operator to take a look at the health of the cluster to see if any brokers are downed.

3. AtMinIsr spikes from zero to non-zero and back down to zero repeatedly

Broker(s) may be experiencing trouble such as high load or temporary network issues which is causing the partition to temporarily fall out of sync.

Public Interfaces

We will introduce two new metrics and a new TopicCommand option to identify AtMinIsr partitions.

# New metrics
- kafka.server:name=AtMinIsrPartitions,type=ReplicaManager
- kafka.cluster:name=AtMinIsr,type=Partition,topic={topic},partition={partition}

# New TopicCommand option
--at-min-isr-partitions

Proposed Changes

We will add the gauge to Partition.scala:

newGauge("AtMinIsr",
  new Gauge[Int] {
    def value = {
      if (isAtMinIsr) 1 else 0
    }
  },
  tags
)


...


def isAtMinIsr: Boolean = {
  leaderReplicaIfLocal match {
    case Some(leaderReplica) =>
      inSyncReplicas.size == leaderReplica.log.get.config.minInSyncReplicas
    case None =>
      false
  }
}

And the similar change to ReplicaManager.scala:

val atMinIsrPartitionCount = newGauge(
  "AtMinIsrPartitionCount",
  new Gauge[Int] {
    def value = leaderPartitionsIterator.count(_.isAtMinIsr)
  }
)

And TopicCommand.scala:

private val reportAtMinIsrPartitionsOpt = parser.accepts("at-min-isr-partitions",
  "if set when describing topics, only show partitions whose isr count is equal to the configured minimum. Not supported with the --zookeeper option.")

private def hasAtMinIsrPartitions(partitionDescription: PartitionDescription) = {
  partitionDescription.isr.size == partitionDescription.minIsrCount
}

private def shouldPrintAtMinIsrPartitions(partitionDescription: PartitionDescription) = {
  opts.reportAtMinIsrPartitions && hasAtMinIsrPartitions(partitionDescription)
}

# Some other minor changes omitted

Compatibility, Deprecation, and Migration Plan

The new TopicCommand option requires use of AdminClient so it will not be available with the --zookeeper option.

Rejected Alternatives

None so far.

  • No labels