THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
source.cluster.alias = A target.cluster.alias = B source.cluster.bootstrap.servers = A_localhost:9092 target.cluster.bootstrap.servers = B_localhost:9092 target.cluster.forwarding.admin.class = custom.package.admin.TargetForwardingAdminClient // Common config for KafkaAdminClient in any ForwardingAdminClient admin.security.protocol = SASL_SSL admin.security.protocol=SASL_SSL admin.sasl.mechanism=PLAIN // Configure Source org.apache.kafka.clients.admin.ForwardingAdminClient source.cluster.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAMEUSERNAME1" password="PASSWORDPASSWORD1"; // Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient target.cluster.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAMEUSERNAME2" password="PASSWORDPASSWORD2"; // Configure Target custom ResourceManagmentRESTClient in custom.packagemypackage.admin.TargetForwardingAdminClient target.cluster.admin.resource.management.url = https://kafka.resource.manager.com target.cluster.admin.resource.management.keystore.path = /path/keystore target.cluster.admin.resource.management.truststore.path = /path/truststore/ca.pem |
...
Code Block | ||
---|---|---|
| ||
package custom.packagemypackage.admin; class TargetForwardingAdminClient extends ForwardingAdminClient { ResourceManagmentRESTClient customResourceManager; public TargetForwardingAdminClient(Map<String, Object> configs) { super(config) customResourceManager = ResourceManagmentRESTClient.create(configs) //method that know how to create client. } @Override public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, CreatePartitionsOptions options) { final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(newPartitions.size()); newPartitions.entrySet().stream().forEach(entry -> { String topicName = entry.getKey(); int partitionCount = entry.getValue().totalCount(); customResourceManager.updateTopicPartition(topicName, partitionCount); futures.put(topicName, new KafkaFutureImpl<>()); }); return new CreatePartitionsResult(new HashMap<>(futures)); } // .... } |
...
Code Block |
---|
clusters = primary, backup primary.bootstrap.servers = A_localhost:9092 backup.bootstrap.servers = B_localhost:9092 // Common config for KafkaAdminClient in any ForwardingAdminClient admin.security.protocol = SASL_SSL admin.security.protocol=SASL_SSL admin.sasl.mechanism=PLAIN // Configure Primary org.apache.kafka.clients.admin.ForwardingAdminClient primary.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAMEUSERNAME1" password="PASSWORDPASSWORD1"; // Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient backup.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAMEUSERNAME2" password="PASSWORDPASSWORD2"; // Configure Target custom ResourceManagmentRESTClient in custom.packagemypackage.admin.TargetForwardingAdminClient backup.admin.resource.management.url = https://kafka.resource.manager.com backup.admin.resource.management.keystore.path = /path/keystore backup.admin.resource.management.truststore.path = /path/truststore/ca.pem |
...