...
kafka-topics.sh --bootstrap-server localhost:9092 --describe --non-preferred-leader
Topic: test-04 2 Partition: 0 1 Leader: 1002 Replicas: 1001,1005,1002 Isr: 1002,1005,1001
Topic: test-04 2 Partition: 1 3 Leader: 1006 Replicas: 1005,1006 ,1004 Isr: 1006,1005
Topic: test-2 Partition: 5 Leader: 1003 Replicas: 1004,1003 Isr: 1003,1004
Topic: zzt test-2 Partition: 0 7 Leader: 1006 Replicas: 1001,1006 ,1002 Isr: 1006,1002,1001
Topic: zzt test-2 Partition: 1 9 Leader: 1003 Replicas: 1002,10011005,1003 Isr: 1003,1001,1002
1005
Topic: zzt test-2 Partition: 3 11 Leader: 1003 1002 Replicas: 1004,1003,1005 1002 Isr: 1003,10051002,1004
Topic: zzt test-1 Partition: 4 0 Leader: 1006 Replicas: 10051001,1004,1006 Isr: 1006,1005,10041001
Topic: zzt test-1 Partition: 6 2 Leader: 1003 Replicas: 1001,10021005,1003 Isr: 1003,1001,10021005
Topic: zzt test-1 Partition: 7 4 Leader: 1003 1002 Replicas: 1004,1002 Isr: 1002,1004
Topic: test-1 Partition: 7 Leader: 1003 Replicas: 1006,1004 1003 Isr: 1003,1002,1004
...
2. --non-preferred-leader-jsonfor the JSON output, one example of use:
kafka-topics.sh --bootstrap-server localhost:9092 --describe --non-preferred-leader-json
1006
Topic: test-1 Partition: 8 Leader: 1004 Replicas: 1005,1004 Isr: 1004,1005
Topic: test-3 Partition: 0 Leader: 1006 Replicas: 1001,1006 Isr: 1006,1001
Topic: test-3 Partition: 2 Leader: 1003 Replicas: 1005,1003 Isr: 1003,1005
Topic: test-3 Partition: 4 Leader: 1002 Replicas: 1004,1002 Isr: 1002,1004
Topic: test-3 Partition: 7 Leader: 1003 Replicas: 1006,1003 Isr: 1003,1006
Topic: test-3 Partition: 8 Leader: 1004 Replicas: 1005,1004 Isr: 1004,1005
...
2. --non-preferred-leader-json for the JSON output, one example of use:
kafka-topics.sh --bootstrap-server localhost:9092 --describe --non-preferred-leader-json
{"partitions":[{"topic":"test-3","partition":2{"partitions":[{"topic":"__consumer_offsets","partition":42},{"topic":"zzt","partition":16},{"topic":"__consumer_offsets","partition":1},{"topic":"__consumer_offsets","partition":36},{"topic":"__consumer_offsets","partition":48},{"topic":"__consumer_offsets","partition":12},{"topic":"__consumer_offsets","partition":30},{"topic":"__consumer_offsets","partition":18},{"topic":"__consumer_offsetstest-1","partition":57},{"topic":"zzttest-3","partition":87},{"topic":"__consumer_offsetstest-1","partition":108},{"topic":"__consumer_offsetstest-2","partition":249},{"topic":"test-043","partition":04},{"topic":"zzttest-2","partition":47},{"topic":"__consumer_offsetstest-3","partition":340},{"topic":"__consumer_offsetstest-1","partition":142},{"topic":"__consumer_offsetstest-2","partition":373},{"topic":"__consumer_offsetstest-2","partition":311},{"topic":"zzttest-1","partition":10},{"topic":"__consumer_offsetstest-3","partition":418},{"topic":"zzttest-2","partition":1911},{"topic":"__consumer_offsetstest-1","partition":24},{"topic":"__consumer_offsetstest-2","partition":9},{"topic":"__consumer_offsets","partition":46},{"topic":"__consumer_offsets","partition":33},{"topic":"__consumer_offsets","partition":22},{"topic":"__consumer_offsets","partition":4},{"topic":"zzt","partition":9},{"topic":"__consumer_offsets","partition":21},{"topic":"__consumer_offsets","partition":49},{"topic":"__consumer_offsets","partition":47},{"topic":"test-04","partition":1},{"topic":"__consumer_offsets","partition":25},{"topic":"zzt","partition":5},{"topic":"__consumer_offsets","partition":0},{"topic":"__consumer_offsets","partition":15},{"topic":"zzt","partition":0},{"topic":"__consumer_offsets","partition":28},{"topic":"zzt","partition":2},{"topic":"__consumer_offsets","partition":32},{"topic":"__consumer_offsets","partition":44},{"topic":"zzt","partition":17},{"topic":"__consumer_offsets","partition":20},{"topic":"__consumer_offsets","partition":7},{"topic":"__consumer_offsets","partition":3},{"topic":"__consumer_offsets","partition":16},{"topic":"zzt","partition":14},{"topic":"zzt","partition":3},{"topic":"__consumer_offsets","partition":35},{"topic":"__consumer_offsets","partition":29},{"topic":"zzt","partition":18},{"topic":"zzt","partition":7},{"topic":"zzt","partition":13},{"topic":"__consumer_offsets","partition":11},{"topic":"__consumer_offsets","partition":17},{"topic":"__consumer_offsets","partition":39},{"topic":"__consumer_offsets","partition":43},{"topic":"__consumer_offsets","partition":23},{"topic":"__consumer_offsets","partition":38},{"topic":"__consumer_offsets","partition":26},{"topic":"zzt","partition":15},{"topic":"zzt","partition":10},{"topic":"__consumer_offsets","partition":19},{"topic":"__consumer_offsets","partition":45},{"topic":"__consumer_offsets","partition":40},{"topic":"zzt","partition":12},{"topic":"__consumer_offsets","partition":8},{"topic":"__consumer_offsets","partition":6},{"topic":"zzt","partition":11},{"topic":"zzt","partition":6},{"topic":"__consumer_offsets","partition":27},{"topic":"__consumer_offsets","partition":13}]}
3. We can direct the output of example two to a json file named preferred.json, Then use the kafka-leader-election.sh tool to trigger the preferred replica election.
kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type PREFERRED --path-to-json-file preferred.json
Successfully completed leader election (PREFERRED) for partitions __consumer_offsets-46, zzt-19, __consumer_offsets-44, zzt-15, test-04-1, zzt-13, __consumer_offsets-19, zzt-9, zzt-7, __consumer_offsets-32, zzt-3, __consumer_offsets-28, zzt-1, test-04-0, __consumer_offsets-26, __consumer_offsets-7, __consumer_offsets-40, __consumer_offsets-5, __consumer_offsets-38, __consumer_offsets-36, __consumer_offsets-1, __consumer_offsets-16, zzt-18, __consumer_offsets-14, zzt-16, zzt-14, __consumer_offsets-10, zzt-12, __consumer_offsets-24, zzt-10, __consumer_offsets-22, __consumer_offsets-20, __consumer_offsets-49, zzt-6, __consumer_offsets-31, zzt-4, zzt-0, __consumer_offsets-8, __consumer_offsets-37, __consumer_offsets-6, __consumer_offsets-35, __consumer_offsets-2
Proposed Changes
Make some changes in TopicCommand.scala
5}]}
3. We can direct the output of example two to a json file named preferred.json, Then use the kafka-leader-election.sh tool to trigger the preferred replica election.
kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type PREFERRED --path-to-json-file preferred.json
Successfully completed leader election (PREFERRED) for partitions test-3-2, test-2-1, test-1-0, test-1-2, test-3-4, test-2-3, test-2-5, test-1-4, test-3-7, test-3-8, test-2-7, test-1-7, test-3-0, test-2-9, test-1-8, test-2-11
Proposed Changes
Make some changes in TopicCommand.scala
Code Block | ||||
---|---|---|---|---|
| ||||
override def describeTopic(opts: TopicCommandOptions): Unit = {
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
ensureTopicExists(topics, opts.topic, !opts.ifExists)
if (topics.nonEmpty) {
val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
val topicPartitions = topicDescriptions
.flatMap(td => td.partitions.iterator().asScala.map(p => new TopicPartition(td.name(), p.partition())))
.toSet.asJava
val reassignments = listAllReassignments(topicPartitions)
if (opts.reportNonPreferredLeaderJson) {
var nonPreferredLeader = Set.empty[TopicPartition]
for (td <- topicDescriptions) {
val topicName = td.name
val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
nonPreferredLeader = nonPreferredLeader.++(sortedPartitions.toSet.filter { tpInfo =>
val reassignment = reassignments.get(new TopicPartition(topicName, tpInfo.partition))
val partitionDesc = PartitionDescription(topicName, tpInfo, Some(config), markedForDeletion = false, reassignment)
describeOptions.shouldPrintTopicPartition(partitionDesc)
}.map(tp => new TopicPartition(topicName, tp.partition())))
}
println(formatAsJson(nonPreferredLeader))
} else {
for (td <- topicDescriptions) {
val topicName = td.name
val topicId = td.topicId()
val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
if (describeOptions.describeConfigs) {
val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
if (!opts.reportOverriddenConfigs || hasNonDefault) {
val numPartitions = td.partitions().size
val firstPartition = td.partitions.iterator.next()
val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
val topicDesc = TopicDescription(topicName, topicId, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false)
topicDesc.printDescription()
}
}
if (describeOptions.describePartitions) {
for (partition <- sortedPartitions) {
val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition))
val partitionDesc = PartitionDescription(topicName, partition, Some(config), markedForDeletion = false, reassignment)
describeOptions.maybePrintPartitionDescription(partitionDesc)
}
}
}
}
}
}
def shouldPrintTopicPartition(partitionDesc: PartitionDescription): Boolean = {
describeConfigs ||
shouldPrintUnderReplicatedPartitions(partitionDesc) ||
shouldPrintUnavailablePartitions(partitionDesc) ||
shouldPrintUnderMinIsrPartitions(partitionDesc) ||
shouldPrintAtMinIsrPartitions(partitionDesc) ||
shouldPrintNonPreferredLeader(partitionDesc)
}
| ||||
Code Block | ||||
| ||||
def isNonPreferredLeader: Boolean = { hasLeader && !info.leader.equals(info.replicas.asScala.head) } private def shouldPrintNonPreferredLeader(partitionDescription: PartitionDescription): Boolean = { (opts.reportNonPreferredLeader || opts.reportNonPreferredLeaderJson) && partitionDescription.isNonPreferredLeader } private val reportNonPreferredLeaderOpt = parser.accepts("non-preferred-leader", "if set when describing topics, only show partitions whose leader is not equal to the first replica in the replica list. Not supported with the --zookeeper option.") private val reportNonPreferredLeaderJsonOpt = parser.accepts("non-preferred-leader-json", "if set when describing topics, only show partitions whose leader is not equal to the first replica in the replica list and output as json format. Not supported with the --zookeeper option.") private val allReplicationReportOpts = Set(reportUnderReplicatedPartitionsOpt, reportUnderMinIsrPartitionsOpt, reportAtMinIsrPartitionsOpt, reportUnavailablePartitionsOpt, reportNonPreferredLeaderOpt, reportNonPreferredLeaderJsonOpt) def reportNonPreferredLeader: Boolean = has(reportNonPreferredLeaderOpt) def reportNonPreferredLeaderJson: Boolean = has(reportNonPreferredLeaderJsonOpt) private def formatAsJson(nonPreferredLeader: Set[TopicPartition]): String = { Json.encodeAsString(Map( "partitions" -> nonPreferredLeader.map { tp => Map( "topic" -> tp.topic(), "partition" -> tp.partition() ).asJava }.asJava ).asJava) } # Some other minor changes omitted |
...
The new option has no effect on existing usage.
Rejected Alternatives
1. Do not use --under-preferred-replica-partitions option, aA better name for the option would be --non-preferred-leader rather than --under-preferred-replica-partitions.
...