THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
package custom.mypackage.admin; class TargetForwardingAdminClient extends ForwardingAdminClient { ResourceManagmentRESTClient customResourceManager; public TargetForwardingAdminClient(Map<String, Object> configs) { super(config) customResourceManager = ResourceManagmentRESTClient.create(configs) //method that know how to create client. } @Override public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, CreatePartitionsOptions options) { final// Map<String,use KafkaFutureImpl<Void>>customResourceManager futures = new HashMap<>(newPartitions.size()); newPartitions.entrySet().stream().forEach(entry -> { String topicName = entry.getKey(); int partitionCount = entry.getValue().totalCount(); customResourceManager.updateTopicPartition(topicName, partitionCount); futures.put(topicName, new KafkaFutureImpl<>()); }); return new CreatePartitionsResult(new HashMap<>(futures)); to updateTopicPartition } // .... } |
MirrorMaker Configuration Properties
...