Table of Contents |
---|
Status
Current state: Under ACCEPTED
Discussion thread: here
Previous Discussion thread: here
Vote thread: here
JIRA: KAFKA-7236
Please
keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).PR: https://github.com/apache/kafka/pull/6224
Motivation
The "min.insync.replicas" configuration specifies the minimum number of insync replicas required for a partition to accept messages from the producer. If the insync replica count of a partition falls under the specified "min.insync.replicas", then the broker will reject messages for producers using acks=all. These producers will suffer unavailability as they will see a NotEnoughReplicas or NotEnoughReplicasAfterAppend exception.
...
We can leverage the describe topics command in TopicCommand to add an option "--under-minisrmin-isr-partitions" to list out exactly which topic partitions are below "min.insync.replicas" and need fixing to maintain availability.
Public Interfaces
This change would add an additional flag "--under-min-minisrisr-partitions" to TopicCommand, but the output will follow the same format as the "under-replicated-partitions" and "offline-partitions" options.
Code Block |
---|
val reportUnderMinIsrPartitionsOpt = parser.accepts("under-min-isr-partitions",
"if set when describing topics, only show partitions which are under the configured minimum in-sync replica count") |
Proposed Changes
The challenge with supporting this additional feature is that the "min.insync.replicas" configuration may be set at a broker or topic level.
We can use the same technique `LogManager` uses to determine the "min.insync.replicas" for a given topic:
(1) Get broker/cluster-level configuration (from `AdminClient.describeConfigs`)
(2) Get topic-level configuration (from `KafkaZkClient.getLogConfigs` while passing in default "min.insync.replicas" as the value retrieved from (1))
We must pass the default value as a parameter in (2) as `getLogConfigs` will return a default value if the specified configuration is not set in Zookeeper (Kafka default for "min.insync.replicas" is 1). If the user has configured "min.insync.replicas" on the broker/cluster-level (and not topic-level), then `getLogConfigs` will return the default value so we must make sure it has the value used by the broker/cluster.
the `AdminClient.describeConfigs` on the topics as that the API call will give us the "computed" proper values for configurations (ConfigSource as "DYNAMIC_TOPIC_CONFIG", "DYNAMIC_BROKER_CONFIG", "DYNAMIC_DEFAULT_BROKER_CONFIG", "STATIC_BROKER_CONFIG", and "DEFAULT_CONFIG").
We can pre-fetch the "computed" topic configurations if "--under-min-isr-partitions" option is specified to avoid making a separate AdminClient call per topic.
Code Block |
---|
# Assuming we have an AdminClient instance
val adminClient = ...
// Pre-fetch and get "computed" topic configs for all specified topics
val computedTopicConfigs = if (reportUnderMinIsrPartitions)
Option(adminClient.describeConfigs(
topics.map(topic => new ConfigResource(ConfigResource.Type.TOPIC, topic)).asJavaCollection).all().get()) else None
for (topic <- topics)
...
if (describePartitions) {
// Get "computed" topic "min.insync.replicas" for this topic
val computedTopicMinISR = if (reportUnderMinISRPartitions)
Option(computedTopicConfigs.get.get(new ConfigResource(ConfigResource.Type.TOPIC, topic))
.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value().toInt) else None
for ((partitionId, assignedReplicas) <- sortedPartitions) {
...
// Print current topic partition if reportUnderMinISRPartitions and ISR count < "computed" min ISR
if (... ||
(reportUnderMinISRPartitions && inSyncReplicas.size < computedTopicMinISR.get) {
... |
This means we need This means we must add an additional flag "--bootstrap-server" to use AdminClient. KIP-377: TopicCommand to use AdminClient to get the broker/cluster "min.insync.replicas" configuration to use as the default when it is not found in Zookeeper is already proposing a change to use AdminClient and introduce a "--bootstrap-server" option, so we can leverage the changes in KIP-377 for this KIP.
NOTE: This option is not supported with the deprecated "--zookeeper" option.
Compatibility, Deprecation, and Migration Plan
As this change adds a new option instead of modifying existing ones, there will not be any compatibility issues or a migrationSince we rely on AdminClient for the computed "min.insync.replicas" configuration, this new option CANNOT be used with the deprecated "--zookeeper" option.
Rejected Alternatives
None so far.