THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Using new KafkaProducer configuration and implementing Redirector interface resolves the problem:
Code Block | ||||
---|---|---|---|---|
| ||||
broker.redirector=custom.redirector.BrokerRedirectorCustomBrokerRedirector |
Code Block | ||||
---|---|---|---|---|
| ||||
public class BrokerRedirectorCustomBrokerRedirector implements Redirector { @Override public void configure(Map<String, ?> props) {} @Override public InetSocketAddress redirect(InetSocketAddress address) { String host = address.getHostString(); if (host.contains("kafka-0")) return new InetSocketAddress("localhost", 9092); else if (host.contains("kafka-1")) return new InetSocketAddress("localhost", 9093); else if (host.contains("kafka-2")) return new InetSocketAddress("localhost", 9094); else if (host.contains("kafka-3")) return new InetSocketAddress("localhost", 9095); else if (host.contains("kafka-4")) return new InetSocketAddress("localhost", 9096); return address; } } |
...