Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, Kafka client connection fails fails with "java.io.IOException: Can't resolve address" after Kafka cluster metadata is updated with internal DNS names of brokers, unreachable by Client. The example of such configuration might be a Java Client calling Kafka Cluster from outside of Kubernetes cluster or AWS network.

...

Public Interfaces

Introduce new org.apache.kafka.common.network.Redirector interface :that will be used by org.apache.kafka.common.network.Selector to redirect to alternative broker address

Code Block
languagejava
titleorg.apache.kafka.common.network.Redirector
package org.apache.kafka.common.network;
import java.net.InetSocketAddress;
import org.apache.kafka.common.Configurable;

public interface Redirector extends Configurable {
	public InetSocketAddress redirect(InetSocketAddress address);
}

 


Proposed Changes

With respect to proposed interface the following changes in existing classes are required:

Changjava.org.apache.kafka.common.network.Selector class to accept Redirector instance through constructor and execute it before opening the connection to broker:

Code Block
languagejava
titlejava.org.apache.kafka.common.network.Selector
collapsetrue
@@ -121,6 +121,7 @@ public class Selector implements Selectable, AutoCloseable {
     private final IdleExpiryManager idleExpiryManager;
     private final MemoryPool memoryPool;
     private final long lowMemThreshold;
+    private final Redirector redirector;
     //indicates if the previous call to poll was able to make progress in reading already-buffered data.
     //this is used to prevent tight loops when memory is not available to read any more data
     private boolean madeReadProgressLastPoll = true;
@@ -147,7 +148,8 @@ public class Selector implements Selectable, AutoCloseable {
             boolean recordTimePerConnection,
             ChannelBuilder channelBuilder,
             MemoryPool memoryPool,
-            LogContext logContext) {
+            LogContext logContext,
+            Redirector redirector) {
         try {
             this.nioSelector = java.nio.channels.Selector.open();
         } catch (IOException e) {
@@ -174,6 +176,21 @@ public class Selector implements Selectable, AutoCloseable {
         this.memoryPool = memoryPool;
         this.lowMemThreshold = (long) (0.1 * this.memoryPool.size());
         this.log = logContext.logger(Selector.class);
+        this.redirector = redirector;
+    }
+
+    public Selector(int maxReceiveSize,
+            long connectionMaxIdleMs,
+            Metrics metrics,
+            Time time,
+            String metricGrpPrefix,
+            Map<String, String> metricTags,
+            boolean metricsPerConnection,
+            boolean recordTimePerConnection,
+            ChannelBuilder channelBuilder,
+            MemoryPool memoryPool,
+            LogContext logContext) {
+        this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, recordTimePerConnection, channelBuilder, memoryPool, logContext, null);
     }
     public Selector(int maxReceiveSize,
@@ -188,10 +205,15 @@ public class Selector implements Selectable, AutoCloseable {
         this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder, MemoryPool.NONE, logContext);
     }
+    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext, Redirector redirector) {
+        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.<String, String>emptyMap(), true, false, channelBuilder, MemoryPool.NONE, logContext, redirector);
+    }
+
     public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) {
         this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.<String, String>emptyMap(), true, channelBuilder, logContext);
     }
+
     /**
      * Begin connecting to the given address and add the connection to this nioSelector associated with the given id
      * number.
@@ -211,6 +233,7 @@ public class Selector implements Selectable, AutoCloseable {
         SocketChannel socketChannel = SocketChannel.open();
         try {
             configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
+            address = (redirector != null) ? redirector.redirect(address) : address;
             boolean connected = doConnect(socketChannel, address);
             SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);

...

Add Redirector configuration support for KafkaConsumer and KafkaProducer and KafkaConsumer as broker.redirector property: 

Code Block
languagejava
titleorg.apache.kafka.clients.consumer.ConsumerConfig
collapsetrue
@@ -211,6 +211,10 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
     public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";
+    /** <code>broker.redirector</code> */
+    public static final String BROKER_REDIRECTOR_CLASS_CONFIG = "broker.redirector";
+    public static final String BROKER_REDIRECTOR_CLASS_DOC = "Broker redirector class that implements the <code>org.apache.kafka.common.network.Redirector</code> interface.";
+
     /** <code>connections.max.idle.ms</code> */
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
@@ -397,6 +401,11 @@ public class ConsumerConfig extends AbstractConfig {
                                         Type.CLASS,
                                         Importance.HIGH,
                                         VALUE_DESERIALIZER_CLASS_DOC)
+                                .define(BROKER_REDIRECTOR_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        null,
+                                        Importance.LOW,
+                                        BROKER_REDIRECTOR_CLASS_DOC)
                                 .define(REQUEST_TIMEOUT_MS_CONFIG,
                                         Type.INT,
                                         305000, // chosen to be higher than the default of max.poll.interval.ms
Code Block
languagejava
titleorg.apache.kafka.clients.consumer.KafkaConsumer
collapsetrue
@@ -44,6 +44,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Redirector;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.requests.MetadataRequest;
@@ -715,8 +716,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+            Redirector brokerRedirector = config.getConfiguredInstance(ConsumerConfig.BROKER_REDIRECTOR_CLASS_CONFIG, Redirector.class);
             NetworkClient netClient = new NetworkClient(
-                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
+                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext, brokerRedirector),
                     this.metadata,
                     clientId,
                     100, // a fixed large enough value will suffice for max in-flight requests
Code Block
languagejava
titleorg.apache.kafka.clients.producer.ProducerConfig
collapsetrue
@@ -181,6 +181,10 @@ public class ProducerConfig extends AbstractConfig {
     public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
     public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";
+    /** <code>broker.redirector</code> */
+    public static final String BROKER_REDIRECTOR_CLASS_CONFIG = "broker.redirector";
+    public static final String BROKER_REDIRECTOR_CLASS_DOC = "Broker redirector class that implements the <code>org.apache.kafka.common.network.Redirector</code> interface.";
+
     /** <code>connections.max.idle.ms</code> */
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
@@ -292,6 +296,11 @@ public class ProducerConfig extends AbstractConfig {
                                         Type.CLASS,
                                         Importance.HIGH,
                                         VALUE_SERIALIZER_CLASS_DOC)
+                                .define(BROKER_REDIRECTOR_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        null,
+                                        Importance.LOW,
+                                        BROKER_REDIRECTOR_CLASS_DOC)
                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
                                         Type.LONG,

...

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

...