...
Introduce new org.apache.kafka.common.network.Redirector interface that will be used by org.apache.kafka.common.network.Selector Selector to redirect to alternative broker address.
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.common.network;
import java.net.InetSocketAddress;
import org.apache.kafka.common.Configurable;
public interface Redirector extends Configurable {
public InetSocketAddress redirect(InetSocketAddress address);
} |
Proposed Changes
With respect to proposed interface the following changes in existing classes are required:
Change org.apache.kafka.common.network.Selector class to accept accept Redirector instance through constructor and execute it before opening the connection to broker:
...
Add Redirector configuration support for KafkaConsumer and KafkaProducer as as broker.redirector redirector property:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
@@ -211,6 +211,10 @@ public class ConsumerConfig extends AbstractConfig { public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"; public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface."; + /** <code>broker.redirector</code> */ + public static final String BROKER_REDIRECTOR_CLASS_CONFIG = "broker.redirector"; + public static final String BROKER_REDIRECTOR_CLASS_DOC = "Broker redirector class that implements the <code>org.apache.kafka.common.network.Redirector</code> interface."; + /** <code>connections.max.idle.ms</code> */ public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; @@ -397,6 +401,11 @@ public class ConsumerConfig extends AbstractConfig { Type.CLASS, Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC) + .define(BROKER_REDIRECTOR_CLASS_CONFIG, + Type.CLASS, + null, + Importance.LOW, + BROKER_REDIRECTOR_CLASS_DOC) .define(REQUEST_TIMEOUT_MS_CONFIG, Type.INT, 305000, // chosen to be higher than the default of max.poll.interval.ms |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
@@ -412,9 +413,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> { } ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics); + Redirector brokerRedirector = config.getConfiguredInstance(ProducerConfig.BROKER_REDIRECTOR_CLASS_CONFIG, Redirector.class); KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient( new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), - this.metrics, time, "producer", channelBuilder, logContext), + this.metrics, time, "producer", channelBuilder, logContext, brokerRedirector), this.metadata, clientId, maxInflightRequests, |
A public interface is any change to the following:
Binary log format
The network protocol and api behavior
Any class in the public packages under clientsConfiguration, especially client configuration
org/apache/kafka/common/serialization
org/apache/kafka/common
org/apache/kafka/common/errors
org/apache/kafka/clients/producer
org/apache/kafka/clients/consumer (eventually, once stable)
Monitoring
Command line tools and arguments
- Anything else that will likely break existing users in some way when they upgrade
Proposed Changes
...
Usa-case example:
Kafka cluster is running in OpenShift as a distributed setup of 5 brokers. Each broker's pod service port is exposed as a separate port on localhost interface. KafkaProducer connects to cluster from outside of OpenShift environment and specifies bootstrap servers as:
Code Block | ||
---|---|---|
| ||
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094,localhost:9095,localhost:9096 |
When message is getting sent, Kafka client recieves clusters metadata with pods internal DNS names and delivery is failed:
Code Block | ||
---|---|---|
| ||
DEBUG org.apache.kafka.clients.NetworkClient[kafka-producer-network-thread | producer-1] - [Producer clientId=producer-1] Initiating connection to node kafka-2.kafka.myproject.svc.cluster.local:9092 (id: 1003 rack: null)
DEBUG org.apache.kafka.clients.NetworkClient[kafka-producer-network-thread | producer-1] - [Producer clientId=producer-1] Error connecting to node kafka-2.kafka.myproject.svc.cluster.local:9092 (id: 1003 rack: null)
java.io.IOException: Can't resolve address: kafka-2.kafka.myproject.svc.cluster.local:9092
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:258)
at org.apache.kafka.common.network.Selector.connect(Selector.java:237)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:792)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:230)
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:263)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.nio.channels.UnresolvedAddressException
at java.base/sun.nio.ch.Net.checkAddress(Net.java:112)
at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:256)
... 7 more
|
Using new KafkaProducer configuration and implementing Redirector interface resolves the problem:
Code Block | ||
---|---|---|
| ||
broker.redirector=custom.redirector.BrokerRedirector |
Code Block | ||
---|---|---|
| ||
public class BrokerRedirector implements Redirector {
@Override
public void configure(Map<String, ?> props) {}
@Override
public InetSocketAddress redirect(InetSocketAddress address) {
String host = address.getHostString();
if (host.contains("kafka-0"))
return new InetSocketAddress("localhost", 9092);
else if (host.contains("kafka-1"))
return new InetSocketAddress("localhost", 9093);
else if (host.contains("kafka-2"))
return new InetSocketAddress("localhost", 9094);
else if (host.contains("kafka-3"))
return new InetSocketAddress("localhost", 9095);
else if (host.contains("kafka-4"))
return new InetSocketAddress("localhost", 9096);
return address;
}
} |
Compatibility, Deprecation, and Migration Plan
...