Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

kafka-topics.sh --bootstrap-server localhost:9092 --describe --non-preferred-leader

Topic: test-04 2 Partition: 0 1 Leader: 1002 Replicas: 1001,1005,1002 Isr: 1002,1005,1001
Topic: test-04 2 Partition: 1 3 Leader: 1006 Replicas: 1005,1006 ,1004 Isr: 1006,1005
Topic: test-2 Partition: 5 Leader: 1003 Replicas: 1004,1003 Isr: 1003,1004
Topic: zzt test-2 Partition: 0 7 Leader: 1006 Replicas: 1001,1006 ,1002 Isr: 1006,1002,1001
Topic: zzt test-2 Partition: 1 9 Leader: 1003 Replicas: 1002,10011005,1003 Isr: 1003,1001,1002
1005
Topic: zzt test-2 Partition: 3 11 Leader: 1003 1002 Replicas: 1004,1003,1005 1002 Isr: 1003,10051002,1004
Topic: zzt test-1 Partition: 4 0 Leader: 1006 Replicas: 10051001,1004,1006 Isr: 1006,1005,10041001
Topic: zzt test-1 Partition: 6 2 Leader: 1003 Replicas: 1001,10021005,1003 Isr: 1003,1001,10021005
Topic: zzt test-1 Partition: 7 4 Leader: 1003 1002 Replicas: 1004,1002 Isr: 1002,1004
Topic: test-1 Partition: 7 Leader: 1003 Replicas: 1006,1004 1003 Isr: 1003,1002,1004

...

2. --non-preferred-leader-jsonfor the JSON output,  one example of use:

kafka-topics.sh --bootstrap-server localhost:9092 --describe --non-preferred-leader-json

1006
Topic: test-1 Partition: 8 Leader: 1004 Replicas: 1005,1004 Isr: 1004,1005
Topic: test-3 Partition: 0 Leader: 1006 Replicas: 1001,1006 Isr: 1006,1001
Topic: test-3 Partition: 2 Leader: 1003 Replicas: 1005,1003 Isr: 1003,1005
Topic: test-3 Partition: 4 Leader: 1002 Replicas: 1004,1002 Isr: 1002,1004
Topic: test-3 Partition: 7 Leader: 1003 Replicas: 1006,1003 Isr: 1003,1006
Topic: test-3 Partition: 8 Leader: 1004 Replicas: 1005,1004 Isr: 1004,1005

...

2. --non-preferred-leader-json for the JSON output,  one example of use:

kafka-topics.sh --bootstrap-server localhost:9092 --describe --non-preferred-leader-json

{"partitions":[{"topic":"test-3","partition":2{"partitions":[{"topic":"__consumer_offsets","partition":42},{"topic":"zzt","partition":16},{"topic":"__consumer_offsets","partition":1},{"topic":"__consumer_offsets","partition":36},{"topic":"__consumer_offsets","partition":48},{"topic":"__consumer_offsets","partition":12},{"topic":"__consumer_offsets","partition":30},{"topic":"__consumer_offsets","partition":18},{"topic":"__consumer_offsetstest-1","partition":57},{"topic":"zzttest-3","partition":87},{"topic":"__consumer_offsetstest-1","partition":108},{"topic":"__consumer_offsetstest-2","partition":249},{"topic":"test-043","partition":04},{"topic":"zzttest-2","partition":47},{"topic":"__consumer_offsetstest-3","partition":340},{"topic":"__consumer_offsetstest-1","partition":142},{"topic":"__consumer_offsetstest-2","partition":373},{"topic":"__consumer_offsetstest-2","partition":311},{"topic":"zzttest-1","partition":10},{"topic":"__consumer_offsetstest-3","partition":418},{"topic":"zzttest-2","partition":1911},{"topic":"__consumer_offsetstest-1","partition":24},{"topic":"__consumer_offsetstest-2","partition":9},{"topic":"__consumer_offsets","partition":46},{"topic":"__consumer_offsets","partition":33},{"topic":"__consumer_offsets","partition":22},{"topic":"__consumer_offsets","partition":4},{"topic":"zzt","partition":9},{"topic":"__consumer_offsets","partition":21},{"topic":"__consumer_offsets","partition":49},{"topic":"__consumer_offsets","partition":47},{"topic":"test-04","partition":1},{"topic":"__consumer_offsets","partition":25},{"topic":"zzt","partition":5},{"topic":"__consumer_offsets","partition":0},{"topic":"__consumer_offsets","partition":15},{"topic":"zzt","partition":0},{"topic":"__consumer_offsets","partition":28},{"topic":"zzt","partition":2},{"topic":"__consumer_offsets","partition":32},{"topic":"__consumer_offsets","partition":44},{"topic":"zzt","partition":17},{"topic":"__consumer_offsets","partition":20},{"topic":"__consumer_offsets","partition":7},{"topic":"__consumer_offsets","partition":3},{"topic":"__consumer_offsets","partition":16},{"topic":"zzt","partition":14},{"topic":"zzt","partition":3},{"topic":"__consumer_offsets","partition":35},{"topic":"__consumer_offsets","partition":29},{"topic":"zzt","partition":18},{"topic":"zzt","partition":7},{"topic":"zzt","partition":13},{"topic":"__consumer_offsets","partition":11},{"topic":"__consumer_offsets","partition":17},{"topic":"__consumer_offsets","partition":39},{"topic":"__consumer_offsets","partition":43},{"topic":"__consumer_offsets","partition":23},{"topic":"__consumer_offsets","partition":38},{"topic":"__consumer_offsets","partition":26},{"topic":"zzt","partition":15},{"topic":"zzt","partition":10},{"topic":"__consumer_offsets","partition":19},{"topic":"__consumer_offsets","partition":45},{"topic":"__consumer_offsets","partition":40},{"topic":"zzt","partition":12},{"topic":"__consumer_offsets","partition":8},{"topic":"__consumer_offsets","partition":6},{"topic":"zzt","partition":11},{"topic":"zzt","partition":6},{"topic":"__consumer_offsets","partition":27},{"topic":"__consumer_offsets","partition":13}]}

3. We can direct the output of example two to a json file named preferred.jsonThen use the kafka-leader-election.sh tool to trigger the preferred replica election.

kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type PREFERRED --path-to-json-file preferred.json

Successfully completed leader election (PREFERRED) for partitions __consumer_offsets-46, zzt-19, __consumer_offsets-44, zzt-15, test-04-1, zzt-13, __consumer_offsets-19, zzt-9, zzt-7, __consumer_offsets-32, zzt-3, __consumer_offsets-28, zzt-1, test-04-0, __consumer_offsets-26, __consumer_offsets-7, __consumer_offsets-40, __consumer_offsets-5, __consumer_offsets-38, __consumer_offsets-36, __consumer_offsets-1, __consumer_offsets-16, zzt-18, __consumer_offsets-14, zzt-16, zzt-14, __consumer_offsets-10, zzt-12, __consumer_offsets-24, zzt-10, __consumer_offsets-22, __consumer_offsets-20, __consumer_offsets-49, zzt-6, __consumer_offsets-31, zzt-4, zzt-0, __consumer_offsets-8, __consumer_offsets-37, __consumer_offsets-6, __consumer_offsets-35, __consumer_offsets-2

Proposed Changes

Make some changes in TopicCommand.scala

5}]}


3. We can direct the output of example two to a json file named preferred.jsonThen use the kafka-leader-election.sh tool to trigger the preferred replica election.

kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type PREFERRED --path-to-json-file preferred.json

Successfully completed leader election (PREFERRED) for partitions test-3-2, test-2-1, test-1-0, test-1-2, test-3-4, test-2-3, test-2-5, test-1-4, test-3-7, test-3-8, test-2-7, test-1-7, test-3-0, test-2-9, test-1-8, test-2-11

Proposed Changes

Make some changes in TopicCommand.scala

Code Block
languagescala
titleTopicCommand.scala
    override def describeTopic(opts: TopicCommandOptions): Unit = {
      val topics = getTopics(opts.topic, opts.excludeInternalTopics)
      ensureTopicExists(topics, opts.topic, !opts.ifExists)

      if (topics.nonEmpty) {
        val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
        val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
        val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
        val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
        val topicPartitions = topicDescriptions
          .flatMap(td => td.partitions.iterator().asScala.map(p => new TopicPartition(td.name(), p.partition())))
          .toSet.asJava
        val reassignments = listAllReassignments(topicPartitions)

        if (opts.reportNonPreferredLeaderJson) {
          var nonPreferredLeader = Set.empty[TopicPartition]
          for (td <- topicDescriptions) {
            val topicName = td.name
            val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
            val sortedPartitions = td.partitions.asScala.sortBy(_.partition)

            nonPreferredLeader = nonPreferredLeader.++(sortedPartitions.toSet.filter { tpInfo =>
              val reassignment = reassignments.get(new TopicPartition(topicName, tpInfo.partition))
              val partitionDesc = PartitionDescription(topicName, tpInfo, Some(config), markedForDeletion = false, reassignment)
              describeOptions.shouldPrintTopicPartition(partitionDesc)
            }.map(tp => new TopicPartition(topicName, tp.partition())))
          }
          println(formatAsJson(nonPreferredLeader))
        } else {
          for (td <- topicDescriptions) {
            val topicName = td.name
            val topicId = td.topicId()
            val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
            val sortedPartitions = td.partitions.asScala.sortBy(_.partition)

            if (describeOptions.describeConfigs) {
              val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
              if (!opts.reportOverriddenConfigs || hasNonDefault) {
                val numPartitions = td.partitions().size
                val firstPartition = td.partitions.iterator.next()
                val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
                val topicDesc = TopicDescription(topicName, topicId, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false)
                topicDesc.printDescription()
              }
            }

            if (describeOptions.describePartitions) {
              for (partition <- sortedPartitions) {
                val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition))
                val partitionDesc = PartitionDescription(topicName, partition, Some(config), markedForDeletion = false, reassignment)
                describeOptions.maybePrintPartitionDescription(partitionDesc)
              }
            }
          }
        }
      }
    }

    def shouldPrintTopicPartition(partitionDesc: PartitionDescription): Boolean = {
      describeConfigs ||
        shouldPrintUnderReplicatedPartitions(partitionDesc) ||
        shouldPrintUnavailablePartitions(partitionDesc) ||
        shouldPrintUnderMinIsrPartitions(partitionDesc) ||
        shouldPrintAtMinIsrPartitions(partitionDesc) ||
        shouldPrintNonPreferredLeader(partitionDesc)
    }

Code Block
languagescala
titleTopicCommand.scala
    def isNonPreferredLeader: Boolean = {
      hasLeader && !info.leader.equals(info.replicas.asScala.head)
    }

    private def shouldPrintNonPreferredLeader(partitionDescription: PartitionDescription): Boolean = {
      (opts.reportNonPreferredLeader || opts.reportNonPreferredLeaderJson) && partitionDescription.isNonPreferredLeader
    }

    private val reportNonPreferredLeaderOpt = parser.accepts("non-preferred-leader",
      "if set when describing topics, only show partitions whose leader is not equal to the first replica in the replica list. Not supported with the --zookeeper option.")
    private val reportNonPreferredLeaderJsonOpt = parser.accepts("non-preferred-leader-json",
      "if set when describing topics, only show partitions whose leader is not equal to the first replica in the replica list and output as json format. Not supported with the --zookeeper option.")

    private val allReplicationReportOpts = Set(reportUnderReplicatedPartitionsOpt, reportUnderMinIsrPartitionsOpt, reportAtMinIsrPartitionsOpt, reportUnavailablePartitionsOpt, reportNonPreferredLeaderOpt, reportNonPreferredLeaderJsonOpt)

    def reportNonPreferredLeader: Boolean = has(reportNonPreferredLeaderOpt)
    def reportNonPreferredLeaderJson: Boolean = has(reportNonPreferredLeaderJsonOpt)

    private def formatAsJson(nonPreferredLeader: Set[TopicPartition]): String = {
      Json.encodeAsString(Map(
        "partitions" -> nonPreferredLeader.map { tp =>
          Map(
            "topic" -> tp.topic(),
            "partition" -> tp.partition()
          ).asJava
        }.asJava
      ).asJava)
    }

	# Some other minor changes omitted

...

The new option has no effect on existing usage.

Rejected Alternatives

1.  Do not use --under-preferred-replica-partitions option, aA better name for the option would be --non-preferred-leader rather than --under-preferred-replica-partitions.

...