Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
# Assuming we have an AdminClient instance
val adminClient = ...

// Pre-fetch and get "computed" topic configs for all specified topics
val computedTopicConfigs = if (reportUnderMinISRPartitions)
  Option(adminClient.describeConfigs(
    topics.map(topic => new ConfigResource(ConfigResource.Type.TOPIC, topic)).asJavaCollection).all().get()) else None

for (topic <- topics)
  ...
  if (describePartitions) {
    // Get "computed" topic "min.insync.replicas" for this topic
    val computedTopicMinISR = if (reportUnderMinISRPartitions)
      Option(computedTopicConfigs.get.get(new ConfigResource(ConfigResource.Type.TOPIC, topic))
      .get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value().toInt) else None


    for ((partitionId, assignedReplicas) <- sortedPartitions) {
      ...

      // Print current topic partition if reportUnderMinISRPartitions and ISR count < "computed" min ISR
      if (... ||
        (reportUnderMinISRPartitions && inSyncReplicas.size < computedTopicMinISR.get) {
      ...

...