Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, Kafka client connection fails with "java.io.IOException: Can't resolve address" after Kafka cluster metadata is updated with internal DNS names of brokers, unreachable by Client. The example of such configuration might be a Java Client calling Kafka Cluster from outside of Kubernetes cluster or AWS network.

The KIP improves NetworkClient to redirect the call to alternative network address and provides developers with additional Configurable interface to specify redirection routesrules.

Public Interfaces

...

Introduce new org.apache.kafka.common.network.Redirector interface:

Code Block
languagejava
titleorg.apache.kafka.common.network.Redirector
package org.apache.kafka.common.network;
import java.net.InetSocketAddress;
import org.apache.kafka.common.Configurable;

public interface Redirector extends Configurable {
	public InetSocketAddress redirect(InetSocketAddress address);
}


Changjava.org.apache.kafka.common.network.Selector class to accept Redirector instance through constructor and execute it before opening the connection to broker:

Code Block
languagejava
titlejava.org.apache.kafka.common.network.Selector
collapsetrue
@@ -121,6 +121,7 @@ public class Selector implements Selectable, AutoCloseable {
     private final IdleExpiryManager idleExpiryManager;
     private final MemoryPool memoryPool;
     private final long lowMemThreshold;
+    private final Redirector redirector;
     //indicates if the previous call to poll was able to make progress in reading already-buffered data.
     //this is used to prevent tight loops when memory is not available to read any more data
     private boolean madeReadProgressLastPoll = true;
@@ -147,7 +148,8 @@ public class Selector implements Selectable, AutoCloseable {
             boolean recordTimePerConnection,
             ChannelBuilder channelBuilder,
             MemoryPool memoryPool,
-            LogContext logContext) {
+            LogContext logContext,
+            Redirector redirector) {
         try {
             this.nioSelector = java.nio.channels.Selector.open();
         } catch (IOException e) {
@@ -174,6 +176,21 @@ public class Selector implements Selectable, AutoCloseable {
         this.memoryPool = memoryPool;
         this.lowMemThreshold = (long) (0.1 * this.memoryPool.size());
         this.log = logContext.logger(Selector.class);
+        this.redirector = redirector;
+    }
+
+    public Selector(int maxReceiveSize,
+            long connectionMaxIdleMs,
+            Metrics metrics,
+            Time time,
+            String metricGrpPrefix,
+            Map<String, String> metricTags,
+            boolean metricsPerConnection,
+            boolean recordTimePerConnection,
+            ChannelBuilder channelBuilder,
+            MemoryPool memoryPool,
+            LogContext logContext) {
+        this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, recordTimePerConnection, channelBuilder, memoryPool, logContext, null);
     }
     public Selector(int maxReceiveSize,
@@ -188,10 +205,15 @@ public class Selector implements Selectable, AutoCloseable {
         this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder, MemoryPool.NONE, logContext);
     }
+    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext, Redirector redirector) {
+        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.<String, String>emptyMap(), true, false, channelBuilder, MemoryPool.NONE, logContext, redirector);
+    }
+
     public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) {
         this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.<String, String>emptyMap(), true, channelBuilder, logContext);
     }
+
     /**
      * Begin connecting to the given address and add the connection to this nioSelector associated with the given id
      * number.
@@ -211,6 +233,7 @@ public class Selector implements Selectable, AutoCloseable {
         SocketChannel socketChannel = SocketChannel.open();
         try {
             configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
+            address = (redirector != null) ? redirector.redirect(address) : address;
             boolean connected = doConnect(socketChannel, address);
             SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);


Add Redirector configuration support for KafkaProducer and KafkaConsumer as broker.redirector property:

 

Code Block
languagejava
titleorg.apache.kafka.clients.consumer.ConsumerConfig
collapsetrue
@@ -211,6 +211,10 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
     public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";
+    /** <code>broker.redirector</code> */
+    public static final String BROKER_REDIRECTOR_CLASS_CONFIG = "broker.redirector";
+    public static final String BROKER_REDIRECTOR_CLASS_DOC = "Broker redirector class that implements the <code>org.apache.kafka.common.network.Redirector</code> interface.";
+
     /** <code>connections.max.idle.ms</code> */
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
@@ -397,6 +401,11 @@ public class ConsumerConfig extends AbstractConfig {
                                         Type.CLASS,
                                         Importance.HIGH,
                                         VALUE_DESERIALIZER_CLASS_DOC)
+                                .define(BROKER_REDIRECTOR_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        null,
+                                        Importance.LOW,
+                                        BROKER_REDIRECTOR_CLASS_DOC)
                                 .define(REQUEST_TIMEOUT_MS_CONFIG,
                                         Type.INT,
                                         305000, // chosen to be higher than the default of max.poll.interval.ms
Code Block
languagejava
titleorg.apache.kafka.clients.consumer.KafkaConsumer
collapsetrue
@@ -44,6 +44,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Redirector;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.requests.MetadataRequest;
@@ -715,8 +716,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+            Redirector brokerRedirector = config.getConfiguredInstance(ConsumerConfig.BROKER_REDIRECTOR_CLASS_CONFIG, Redirector.class);
             NetworkClient netClient = new NetworkClient(
-                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
+                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext, brokerRedirector),
                     this.metadata,
                     clientId,
                     100, // a fixed large enough value will suffice for max in-flight requests
Code Block
titleorg.apache.kafka.clients.producer.ProducerConfig
collapsetrue
@@ -181,6 +181,10 @@ public class ProducerConfig extends AbstractConfig {
     public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
     public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";
+    /** <code>broker.redirector</code> */
+    public static final String BROKER_REDIRECTOR_CLASS_CONFIG = "broker.redirector";
+    public static final String BROKER_REDIRECTOR_CLASS_DOC = "Broker redirector class that implements the <code>org.apache.kafka.common.network.Redirector</code> interface.";
+
     /** <code>connections.max.idle.ms</code> */
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
@@ -292,6 +296,11 @@ public class ProducerConfig extends AbstractConfig {
                                         Type.CLASS,
                                         Importance.HIGH,
                                         VALUE_SERIALIZER_CLASS_DOC)
+                                .define(BROKER_REDIRECTOR_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        null,
+                                        Importance.LOW,
+                                        BROKER_REDIRECTOR_CLASS_DOC)
                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
                                         Type.LONG,
Code Block
languagejava
titleorg.apache.kafka.clients.producer.KafkaProducer
collapsetrue
@@ -412,9 +413,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             }
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
             Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
+            Redirector brokerRedirector = config.getConfiguredInstance(ProducerConfig.BROKER_REDIRECTOR_CLASS_CONFIG, Redirector.class);
             KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
                     new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
-                            this.metrics, time, "producer", channelBuilder, logContext),
+                            this.metrics, time, "producer", channelBuilder, logContext, brokerRedirector),
                     this.metadata,
                     clientId,
                     maxInflightRequests,



A public interface is any change to the following:

...