Status
Current state: DISCUSSION
Discussion thread: here
JIRA: KAFKA-7904
Motivation
Today, a topic partition may be categorized as:
(1) Fully in sync (inSyncReplicas == allReplicasMap)
(2) UnderReplicated
(3) UnderMinIsr
(4) Offline
(3) and (4) are failure scenarios in which clients will face unavailability (producers with acks=ALL will fail to produce if ISR count is under the configured "min.insync.replicas" count).
(2) Under-replicated partitions occur whenever the inSyncReplicas set is not equal to the allReplicasMap, which can happen when we have:
- Repartitioning
- Broker restarts
- Transient network issues
- Broker failures
The current categorization of topic partitions has a gap as an UnderReplicatedPartition does not tell operators if the reduced ISR set is intentional (repartitioning/restarts) or if there may be something wrong such as a broker has completely failed. This makes it hard for operators as setting an alert for UnderReplicatedPartitions may not be effective as it may be too noisy, and increasing the # of samples needed to trigger the alert increases the time to detect failures.
This KIP aims to fill this gap by proposing a new categorization of partitions: AtMinIsr, which consists of partitions that only have the minimum number of insync replicas remaining in the ISR set (as configured by "min.insync.replicas").
If a partition is "AtMinIsr", then it suggests something severe has happened, but more importantly that one more failure can result in unavailability so some sort of action should be taken (ex. repartitioning).
Examples
Example 1:
1 partition
minIsrCount=2
ISR set = [0,1,2]
1. Broker 0 fails
- ISR set = [1,2]
- partition is UnderReplicatedPartition and AtMinIsr
2. Broker 1 fails
- ISR set = [2]
- partition is UnderReplicatedPartition and UnderMinIsr
In this example, AtMinIsr triggers when UnderReplicatedPartition triggers and tells us that 1 more failure will cause producers with ack=ALL to be unavailable.
Example 2:
1 partition
minIsrCount=1
ISR set = [0,1,2]
1. Broker 0 fails
- ISR set = [1,2]
- partition is UnderReplicatedPartition
2. Broker 1 fails
- ISR set = [2]
- partition is UnderReplicatedPartition and AtMinIsr
3. Broker 2 fails
- ISR set = []
- partition is OfflinePartition
In this example, AtMinIsr triggers when there is only 1 insync replica remaining, and tells us that 1 more failure will cause the partition to go completely offline!
AtMinIsr Values + Possible Explanations
1. AtMinIsr is zero
Everything is fine, and business as usual. Nothing to see here.
2. AtMinIsr is consistently greater than zero for a prolonged period of time
Broker(s) may have failed so this could warrant an alert for an operator to take a look at the health of the cluster to see if any brokers are downed.
3. AtMinIsr spikes from zero to non-zero and back down to zero repeatedly
Broker(s) may be experiencing trouble such as high load or temporary network issues which is causing the partition to temporarily fall out of sync.
Public Interfaces
We will introduce two new metrics and a new TopicCommand option to identify AtMinIsr partitions.
# New metrics # Broker metric - kafka.server:name=AtMinIsrPartitionCount,type=ReplicaManager # Partition metric - kafka.cluster:name=AtMinIsr,type=Partition,topic={topic},partition={partition} # New TopicCommand option --at-min-isr-partitions
Proposed Changes
We will add the gauge to Partition.scala:
newGauge("AtMinIsr", new Gauge[Int] { def value = { if (isAtMinIsr) 1 else 0 } }, tags ) ... def isAtMinIsr: Boolean = { leaderReplicaIfLocal match { case Some(leaderReplica) => inSyncReplicas.size == leaderReplica.log.get.config.minInSyncReplicas case None => false } }
And the similar change to ReplicaManager.scala:
val atMinIsrPartitionCount = newGauge( "AtMinIsrPartitionCount", new Gauge[Int] { def value = leaderPartitionsIterator.count(_.isAtMinIsr) } )
And TopicCommand.scala:
private val reportAtMinIsrPartitionsOpt = parser.accepts("at-min-isr-partitions", "if set when describing topics, only show partitions whose isr count is equal to the configured minimum. Not supported with the --zookeeper option.") private def hasAtMinIsrPartitions(partitionDescription: PartitionDescription) = { partitionDescription.isr.size == partitionDescription.minIsrCount } private def shouldPrintAtMinIsrPartitions(partitionDescription: PartitionDescription) = { opts.reportAtMinIsrPartitions && hasAtMinIsrPartitions(partitionDescription) } # Some other minor changes omitted
Compatibility, Deprecation, and Migration Plan
The new TopicCommand option requires use of AdminClient so it will not be available with the --zookeeper option.
Rejected Alternatives
None so far.