Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The challenge with supporting this additional feature is that the "min.insync.replicas" configuration may be set at a broker or topic level.

We can use the same technique `LogManager` uses to determine the "min.insync.replicas" for a given topic:

(1) Get broker/cluster-level configuration (from `AdminClient.describeConfigs`)

(2) Get topic-level configuration (from `KafkaZkClient.getLogConfigs` while passing in default "min.insync.replicas" as the value retrieved from (1))

We must pass the default value as a parameter in (2) as `KafkaZkClient.getLogConfigs` will return a default value if the specified configuration is not set in Zookeeper (Kafka default for "min.insync.replicas" is 1).

If the user has configured "min.insync.replicas" on the broker/cluster-level (and not topic-level), then `KafkaZkClient.getLogConfigs` will return the default value of 1 instead of the broker/cluster-level value, so we must make sure to pass in the value used by the broker/cluster as the "default" value.

`AdminClient.describeConfigs` on the topics as that the API call will give us the "computed" proper value (ConfigSource as "DYNAMIC_TOPIC_CONFIG", "DYNAMIC_BROKER_CONFIG", "DYNAMIC_DEFAULT_BROKER_CONFIG", "STATIC_BROKER_CONFIG", and "DEFAULT_CONFIG").

We can pre-fetch the "computed" topic configurations if "--under-minisr-partitions" option is specified to avoid making a separate AdminClient call per topic.

Code Block
# Assuming we have an AdminClient instance
val adminClient = ...


val computedTopicConfigs = if (reportUnderMinISRPartitions)
  Option(adminClient.describeConfigs(
    topics.map(topic => new ConfigResource(ConfigResource.Type.TOPIC, topic)).asJavaCollection).all().get()) else None


for (topic <- topics)
  ...
  if (describePartitions) {
    ...
    val computedTopicMinISR = if (reportUnderMinISRPartitions)
      Option(computedTopicConfigs.get.get(new ConfigResource(ConfigResource.Type.TOPIC, topic))
      .get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value().toInt) else None


    if (... ||
      (reportUnderMinISRPartitions && inSyncReplicas.size < computedTopicMinISR.get) {
    ...


This means we need This means we must add an additional flag "--bootstrap-server" to use AdminClient to get the broker/cluster "min.insync.replicas" configuration to use as the default when it is not found in Zookeeper.

Example usage:

...

KIP-377: TopicCommand to use AdminClient is already proposing this change to using AdminClient, so we can wait on KIP-377 to be completed first.

Compatibility, Deprecation, and Migration Plan

...