Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under  Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The purpose of the change is to accommodate environments without proper DNS support, when KafkaProducer or KafkaConsumer tries to connect to Kafka brokers that are inside another network.

Currently, Kafka client connection fails fails with "java.io.IOException: Can't resolve address" after Kafka cluster metadata is updated with internal DNS names of brokers, unreachable by Client. The example of such configuration might be a Java Client calling Kafka Cluster from outside of Kubernetes cluster or AWS network.

The KIP improves NetworkClient to redirect the call to alternative network address and provides developers with additional Configurable interface to specify redirection rules.

Public Interfaces

Introduce new org.apache.kafka.common.network.Redirector interface :that will be used by org.apache.kafka.common.network.Selector to redirect to alternative broker address.

Code Block
language
Code Block
languagejava
titleorg.apache.kafka.common.network.Redirector
package org.apache.kafka.common.network;
import java.net.InetSocketAddress;
import org.apache.kafka.common.Configurable;

public interface Redirector extends Configurable {
	public InetSocketAddress redirect(InetSocketAddress address);
} 


Proposed Changes

With respect to proposed interface the following changes in existing classes are required:

Changjava.org.apache.kafka.common.network.Selector class to accept accept Redirector instance through constructor and execute it before opening the connection to broker:

Code Block
languagejava
titlejava.org.apache.kafka.common.network.Selector
collapsetrue
@@ -121,6 +121,7 @@ public class Selector implements Selectable, AutoCloseable {
     private final IdleExpiryManager idleExpiryManager;
     private final MemoryPool memoryPool;
     private final long lowMemThreshold;
+    private final Redirector redirector;
     //indicates if the previous call to poll was able to make progress in reading already-buffered data.
     //this is used to prevent tight loops when memory is not available to read any more data
     private boolean madeReadProgressLastPoll = true;
@@ -147,7 +148,8 @@ public class Selector implements Selectable, AutoCloseable {
             boolean recordTimePerConnection,
             ChannelBuilder channelBuilder,
             MemoryPool memoryPool,
-            LogContext logContext) {
+            LogContext logContext,
+            Redirector redirector) {
         try {
             this.nioSelector = java.nio.channels.Selector.open();
         } catch (IOException e) {
@@ -174,6 +176,21 @@ public class Selector implements Selectable, AutoCloseable {
         this.memoryPool = memoryPool;
         this.lowMemThreshold = (long) (0.1 * this.memoryPool.size());
         this.log = logContext.logger(Selector.class);
+        this.redirector = redirector;
+    }
+
+    public Selector(int maxReceiveSize,
+            long connectionMaxIdleMs,
+            Metrics metrics,
+            Time time,
+            String metricGrpPrefix,
+            Map<String, String> metricTags,
+            boolean metricsPerConnection,
+            boolean recordTimePerConnection,
+            ChannelBuilder channelBuilder,
+            MemoryPool memoryPool,
+            LogContext logContext) {
+        this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, recordTimePerConnection, channelBuilder, memoryPool, logContext, null);
     }
     public Selector(int maxReceiveSize,
@@ -188,10 +205,15 @@ public class Selector implements Selectable, AutoCloseable {
         this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder, MemoryPool.NONE, logContext);
     }
+    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext, Redirector redirector) {
+        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.<String, String>emptyMap(), true, false, channelBuilder, MemoryPool.NONE, logContext, redirector);
+    }
+
     public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) {
         this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.<String, String>emptyMap(), true, channelBuilder, logContext);
     }
+
     /**
      * Begin connecting to the given address and add the connection to this nioSelector associated with the given id
      * number.
@@ -211,6 +233,7 @@ public class Selector implements Selectable, AutoCloseable {
         SocketChannel socketChannel = SocketChannel.open();
         try {
             configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
+            address = (redirector != null) ? redirector.redirect(address) : address;
             boolean connected = doConnect(socketChannel, address);
             SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);

...

Add Redirector configuration support for KafkaConsumer and KafkaProducer and KafkaConsumer as  as broker.redirector redirector property:

 

Code Block
languagejava
titleorg.apache.kafka.clients.consumer.ConsumerConfig
collapsetrue
@@ -211,6 +211,10 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
     public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";
+    /** <code>broker.redirector</code> */
+    public static final String BROKER_REDIRECTOR_CLASS_CONFIG = "broker.redirector";
+    public static final String BROKER_REDIRECTOR_CLASS_DOC = "Broker redirector class that implements the <code>org.apache.kafka.common.network.Redirector</code> interface.";
+
     /** <code>connections.max.idle.ms</code> */
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
@@ -397,6 +401,11 @@ public class ConsumerConfig extends AbstractConfig {
                                         Type.CLASS,
                                         Importance.HIGH,
                                         VALUE_DESERIALIZER_CLASS_DOC)
+                                .define(BROKER_REDIRECTOR_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        null,
+                                        Importance.LOW,
+                                        BROKER_REDIRECTOR_CLASS_DOC)
                                 .define(REQUEST_TIMEOUT_MS_CONFIG,
                                         Type.INT,
                                         305000, // chosen to be higher than the default of max.poll.interval.ms
Code Block
languagejava
titleorg.apache.kafka.clients.consumer.KafkaConsumer
collapsetrue
@@ -44,6 +44,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Redirector;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.requests.MetadataRequest;
@@ -715,8 +716,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+            Redirector brokerRedirector = config.getConfiguredInstance(ConsumerConfig.BROKER_REDIRECTOR_CLASS_CONFIG, Redirector.class);
             NetworkClient netClient = new NetworkClient(
-                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
+                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext, brokerRedirector),
                     this.metadata,
                     clientId,
                     100, // a fixed large enough value will suffice for max in-flight requests
Code Block
languagejava
titleorg.apache.kafka.clients.producer.ProducerConfig
collapsetrue
@@ -181,6 +181,10 @@ public class ProducerConfig extends AbstractConfig {
     public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
     public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";
+    /** <code>broker.redirector</code> */
+    public static final String BROKER_REDIRECTOR_CLASS_CONFIG = "broker.redirector";
+    public static final String BROKER_REDIRECTOR_CLASS_DOC = "Broker redirector class that implements the <code>org.apache.kafka.common.network.Redirector</code> interface.";
+
     /** <code>connections.max.idle.ms</code> */
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
@@ -292,6 +296,11 @@ public class ProducerConfig extends AbstractConfig {
                                         Type.CLASS,
                                         Importance.HIGH,
                                         VALUE_SERIALIZER_CLASS_DOC)
+                                .define(BROKER_REDIRECTOR_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        null,
+                                        Importance.LOW,
+                                        BROKER_REDIRECTOR_CLASS_DOC)
                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
                                         Type.LONG,
Code Block
languagejava
titleorg.apache.kafka.clients.producer.KafkaProducer
collapsetrue
@@ -412,9 +413,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             }
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
             Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
+            Redirector brokerRedirector = config.getConfiguredInstance(ProducerConfig.BROKER_REDIRECTOR_CLASS_CONFIG, Redirector.class);
             KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
                     new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
-                            this.metrics, time, "producer", channelBuilder, logContext),
+                            this.metrics, time, "producer", channelBuilder, logContext, brokerRedirector),
                     this.metadata,
                     clientId,
                     maxInflightRequests,

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

 

Usa-case example: 

Kafka cluster is running in OpenShift as a distributed setup of 5 brokers. Each broker's pod service port is exposed as a separate port on localhost interface. KafkaProducer connects to cluster from outside of OpenShift environment and specifies bootstrap servers as:

Code Block
languagejava
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094,localhost:9095,localhost:9096

When message is getting sent, Kafka client recieves clusters metadata with pods internal DNS names and delivery is failed:

Code Block
languagejava
DEBUG org.apache.kafka.clients.NetworkClient[kafka-producer-network-thread | producer-1] - [Producer clientId=producer-1] Initiating connection to node kafka-2.kafka.myproject.svc.cluster.local:9092 (id: 1003 rack: null)
DEBUG org.apache.kafka.clients.NetworkClient[kafka-producer-network-thread | producer-1] - [Producer clientId=producer-1] Error connecting to node kafka-2.kafka.myproject.svc.cluster.local:9092 (id: 1003 rack: null)
java.io.IOException: Can't resolve address: kafka-2.kafka.myproject.svc.cluster.local:9092
	at org.apache.kafka.common.network.Selector.doConnect(Selector.java:258)
	at org.apache.kafka.common.network.Selector.connect(Selector.java:237)
	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:792)
	at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:230)
	at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:263)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
	at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.nio.channels.UnresolvedAddressException
	at java.base/sun.nio.ch.Net.checkAddress(Net.java:112)
	at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
	at org.apache.kafka.common.network.Selector.doConnect(Selector.java:256)
	... 7 more


Using new KafkaProducer configuration and implementing Redirector interface resolves the problem:

Code Block
languagejava
titleKafkaProducer configuration example
broker.redirector=custom.redirector.CustomBrokerRedirector
Code Block
languagejava
titleRedirector implementation example
public class CustomBrokerRedirector implements Redirector {
	@Override
	public void configure(Map<String, ?> props) {}

	@Override
	public InetSocketAddress redirect(InetSocketAddress address) {
		String host = address.getHostString();
		if (host.contains("kafka-0"))
			return new InetSocketAddress("localhost", 9092);
		else if (host.contains("kafka-1"))
			return new InetSocketAddress("localhost", 9093);
		else if (host.contains("kafka-2"))
			return new InetSocketAddress("localhost", 9094);
		else if (host.contains("kafka-3"))
			return new InetSocketAddress("localhost", 9095);
		else if (host.contains("kafka-4"))
			return new InetSocketAddress("localhost", 9096);		
		return address;
	}
}


Compatibility, Deprecation, and Migration Plan

The KIP is fully backward-compatible. If no Redirector configuration specified, Selector will work in the same way.

Rejected Alternatives

None

 If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.