Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under ACCEPTED

Discussion thread: here

Previous Discussion thread: here

Vote thread: here

JIRA: KAFKA-7236
Please

keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).PRhttps://github.com/apache/kafka/pull/6224

Motivation

The "min.insync.replicas" configuration specifies the minimum number of insync replicas required for a partition to accept messages from the producer. If the insync replica count of a partition falls under the specified "min.insync.replicas", then the broker will reject messages for producers using acks=all. These producers will suffer unavailability as they will see a NotEnoughReplicas or NotEnoughReplicasAfterAppend exception.

...

We can leverage the describe topics command in TopicCommand to add an option "--under-min-minisrisr-partitions" to list out exactly which topic partitions are below "min.insync.replicas" and need fixing to maintain availability.

Public Interfaces

This change would add an additional flag "--under-min-minisrisr-partitions" to TopicCommand, but the output will follow the same format as the "under-replicated-partitions" and "offline-partitions" options.

Code Block
val reportUnderMinIsrPartitionsOpt = parser.accepts("under-min-isr-partitions",
  									     "if set when describing topics, only show partitions which are under the configured minimum in-sync replica count")


Proposed Changes

The challenge with supporting this additional feature is that the "min.insync.replicas" configuration may be set at a broker or topic level.

...

We can pre-fetch the "computed" topic configurations if "--under-minisrmin-isr-partitions" option is specified to avoid making a separate AdminClient call per topic.

Code Block
# Assuming we have an AdminClient instance
val adminClient = ...

// Pre-fetch and get "computed" topic configs for all specified topics
val computedTopicConfigs = if (reportUnderMinISRPartitionsreportUnderMinIsrPartitions)
  Option(adminClient.describeConfigs(
    topics.map(topic => new ConfigResource(ConfigResource.Type.TOPIC, topic)).asJavaCollection).all().get()) else None

for (topic <- topics)
  ...
  if (describePartitions) {
    // Get "computed" topic "min.insync.replicas" for this topic
    val computedTopicMinISR = if (reportUnderMinISRPartitions)
      Option(computedTopicConfigs.get.get(new ConfigResource(ConfigResource.Type.TOPIC, topic))
      .get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value().toInt) else None

    for ((partitionId, assignedReplicas) <- sortedPartitions) {
      ...

      // Print current topic partition if reportUnderMinISRPartitions and ISR count < "computed" min ISR
      if (... ||
        (reportUnderMinISRPartitions && inSyncReplicas.size < computedTopicMinISR.get) {
      ...


This means we need an additional flag "--bootstrap-server" to use AdminClient. KIP-377: TopicCommand to use AdminClient is already proposing this a change to using AdminClientuse AdminClient and introduce a "--bootstrap-server" option, so we can wait on leverage the changes in KIP-377 to be completed firstfor this KIP.

NOTE: This option is not supported with the deprecated "--zookeeper" option.

Compatibility, Deprecation, and Migration Plan

As this change adds a new option instead of modifying existing ones, there will not be any compatibility issues or a migrationSince we rely on AdminClient for the computed "min.insync.replicas" configuration, this new option CANNOT be used with the deprecated "--zookeeper" option.

Rejected Alternatives

None so far.