Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-7236
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The "min.insync.replicas" configuration specifies the minimum number of insync replicas required for a partition to accept messages from the producer. If the insync replica count of a partition falls under the specified "min.insync.replicas", then the broker will reject messages for producers using acks=all. These producers will suffer unavailability as they will see a NotEnoughReplicas or NotEnoughReplicasAfterAppend exception.
We currently have an UnderMinIsrPartitionCount metric which is useful for identifying when partitions fall under "min.insync.replicas", however it is still difficult to identify which topic partitions are affected and need fixing.
We can leverage the describe topics command in TopicCommand to add an option "--under-minisr-partitions" to list out exactly which topic partitions are below "min.insync.replicas".
Public Interfaces
This change would add an additional flag "--under-minisr-partitions" to TopicCommand, but the output will follow the same format as the "under-replicated-partitions" and "offline-partitions" options.
Proposed Changes
The challenge with supporting this additional feature is that the "min.insync.replicas" configuration may be set at a broker or topic level.
We can use the `AdminClient.describeConfigs` on the topics as that the API call will give us the "computed" proper value (ConfigSource as "DYNAMIC_TOPIC_CONFIG", "DYNAMIC_BROKER_CONFIG", "DYNAMIC_DEFAULT_BROKER_CONFIG", "STATIC_BROKER_CONFIG", and "DEFAULT_CONFIG").
We can pre-fetch the "computed" topic configurations if "--under-minisr-partitions" option is specified to avoid making a separate AdminClient call per topic.
# Assuming we have an AdminClient instance val adminClient = ... // Pre-fetch and get "computed" topic configs for all specified topics val computedTopicConfigs = if (reportUnderMinISRPartitions) Option(adminClient.describeConfigs( topics.map(topic => new ConfigResource(ConfigResource.Type.TOPIC, topic)).asJavaCollection).all().get()) else None for (topic <- topics) ... if (describePartitions) { // Get "computed" topic "min.insync.replicas" for this topic val computedTopicMinISR = if (reportUnderMinISRPartitions) Option(computedTopicConfigs.get.get(new ConfigResource(ConfigResource.Type.TOPIC, topic)) .get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value().toInt) else None for ((partitionId, assignedReplicas) <- sortedPartitions) { ... // Print current topic partition if reportUnderMinISRPartitions and ISR count < "computed" min ISR if (... || (reportUnderMinISRPartitions && inSyncReplicas.size < computedTopicMinISR.get) { ...
This means we need an additional flag "--bootstrap-server" to use AdminClient. KIP-377: TopicCommand to use AdminClient is already proposing this change to using AdminClient, so we can wait on KIP-377 to be completed first.
Compatibility, Deprecation, and Migration Plan
As this change adds a new option instead of modifying existing ones, there will not be any compatibility issues or a migration.
Rejected Alternatives
None so far.