Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
source.cluster.alias = A
target.cluster.alias = B

source.cluster.bootstrap.servers = A_localhost:9092
target.cluster.bootstrap.servers = B_localhost:9092

target.cluster.forwarding.admin.class = custom.package.admin.TargetForwardingAdminClient

// Common config for KafkaAdminClient in any ForwardingAdminClient
admin.security.protocol = SASL_SSL
admin.security.protocol=SASL_SSL
admin.sasl.mechanism=PLAIN

// Configure Source org.apache.kafka.clients.admin.ForwardingAdminClient
source.cluster.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAMEUSERNAME1" password="PASSWORDPASSWORD1";


// Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient
target.cluster.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAMEUSERNAME2" password="PASSWORDPASSWORD2";

// Configure Target custom ResourceManagmentRESTClient in custom.packagemypackage.admin.TargetForwardingAdminClient
target.cluster.admin.resource.management.url = https://kafka.resource.manager.com
target.cluster.admin.resource.management.keystore.path = /path/keystore
target.cluster.admin.resource.management.truststore.path = /path/truststore/ca.pem

...

Code Block
languagejava
package custom.packagemypackage.admin;

class TargetForwardingAdminClient extends ForwardingAdminClient {
    ResourceManagmentRESTClient customResourceManager;
    public TargetForwardingAdminClient(Map<String, Object> configs) {
    	super(config)

	    customResourceManager = ResourceManagmentRESTClient.create(configs) //method that know how to create client.
    }

    @Override
    public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, CreatePartitionsOptions options) {
        final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(newPartitions.size());
        newPartitions.entrySet().stream().forEach(entry -> {
            String topicName = entry.getKey();
            int partitionCount = entry.getValue().totalCount();
            customResourceManager.updateTopicPartition(topicName, partitionCount);
            futures.put(topicName, new KafkaFutureImpl<>());
        });
        return new CreatePartitionsResult(new HashMap<>(futures));
    }
   // ....
}

...

Code Block
clusters = primary, backup
primary.bootstrap.servers = A_localhost:9092
backup.bootstrap.servers = B_localhost:9092

// Common config for KafkaAdminClient in any ForwardingAdminClient
admin.security.protocol = SASL_SSL
admin.security.protocol=SASL_SSL
admin.sasl.mechanism=PLAIN


// Configure Primary org.apache.kafka.clients.admin.ForwardingAdminClient
primary.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAMEUSERNAME1" password="PASSWORDPASSWORD1";


// Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient
backup.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAMEUSERNAME2" password="PASSWORDPASSWORD2";

// Configure Target custom ResourceManagmentRESTClient in custom.packagemypackage.admin.TargetForwardingAdminClient
backup.admin.resource.management.url = https://kafka.resource.manager.com
backup.admin.resource.management.keystore.path = /path/keystore
backup.admin.resource.management.truststore.path = /path/truststore/ca.pem

...