Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Using new KafkaProducer configuration and implementing Redirector interface resolves the problem:

Code Block
languagejava
titleKafkaProducer configuration example
broker.redirector=custom.redirector.BrokerRedirectorCustomBrokerRedirector
Code Block
languagejava
titleRedirector implementation example
public class BrokerRedirectorCustomBrokerRedirector implements Redirector {
	@Override
	public void configure(Map<String, ?> props) {}

	@Override
	public InetSocketAddress redirect(InetSocketAddress address) {
		String host = address.getHostString();
		if (host.contains("kafka-0"))
			return new InetSocketAddress("localhost", 9092);
		else if (host.contains("kafka-1"))
			return new InetSocketAddress("localhost", 9093);
		else if (host.contains("kafka-2"))
			return new InetSocketAddress("localhost", 9094);
		else if (host.contains("kafka-3"))
			return new InetSocketAddress("localhost", 9095);
		else if (host.contains("kafka-4"))
			return new InetSocketAddress("localhost", 9096);		
		return address;
	}
}

...