Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Introduce new org.apache.kafka.common.network.Redirector interface that will be used by org.apache.kafka.common.network.Selector Selector to redirect to alternative broker address.

Code Block
languagejava
titleorg.apache.kafka.common.network.Redirector
package org.apache.kafka.common.network;
import java.net.InetSocketAddress;
import org.apache.kafka.common.Configurable;

public interface Redirector extends Configurable {
	public InetSocketAddress redirect(InetSocketAddress address);
} 


Proposed Changes

With respect to proposed interface the following changes in existing classes are required:

Changorg.apache.kafka.common.network.Selector class to accept accept Redirector instance through constructor and execute it before opening the connection to broker:

...

Add Redirector configuration support for KafkaConsumer and KafkaProducer as  as broker.redirector redirector property:

Code Block
languagejava
titleorg.apache.kafka.clients.consumer.ConsumerConfig
collapsetrue
@@ -211,6 +211,10 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
     public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";
+    /** <code>broker.redirector</code> */
+    public static final String BROKER_REDIRECTOR_CLASS_CONFIG = "broker.redirector";
+    public static final String BROKER_REDIRECTOR_CLASS_DOC = "Broker redirector class that implements the <code>org.apache.kafka.common.network.Redirector</code> interface.";
+
     /** <code>connections.max.idle.ms</code> */
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
@@ -397,6 +401,11 @@ public class ConsumerConfig extends AbstractConfig {
                                         Type.CLASS,
                                         Importance.HIGH,
                                         VALUE_DESERIALIZER_CLASS_DOC)
+                                .define(BROKER_REDIRECTOR_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        null,
+                                        Importance.LOW,
+                                        BROKER_REDIRECTOR_CLASS_DOC)
                                 .define(REQUEST_TIMEOUT_MS_CONFIG,
                                         Type.INT,
                                         305000, // chosen to be higher than the default of max.poll.interval.ms

...

Code Block
languagejava
titleorg.apache.kafka.clients.producer.KafkaProducer
collapsetrue
@@ -412,9 +413,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             }
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
             Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
+            Redirector brokerRedirector = config.getConfiguredInstance(ProducerConfig.BROKER_REDIRECTOR_CLASS_CONFIG, Redirector.class);
             KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
                     new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
-                            this.metrics, time, "producer", channelBuilder, logContext),
+                            this.metrics, time, "producer", channelBuilder, logContext, brokerRedirector),
                     this.metadata,
                     clientId,
                     maxInflightRequests,

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

...

 

Usa-case example: 

Kafka cluster is running in OpenShift as a distributed setup of 5 brokers. Each broker's pod service port is exposed as a separate port on localhost interface. KafkaProducer connects to cluster from outside of OpenShift environment and specifies bootstrap servers as:

Code Block
languagejava
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094,localhost:9095,localhost:9096

When message is getting sent, Kafka client recieves clusters metadata with pods internal DNS names and delivery is failed:

Code Block
languagejava
DEBUG org.apache.kafka.clients.NetworkClient[kafka-producer-network-thread | producer-1] - [Producer clientId=producer-1] Initiating connection to node kafka-2.kafka.myproject.svc.cluster.local:9092 (id: 1003 rack: null)
DEBUG org.apache.kafka.clients.NetworkClient[kafka-producer-network-thread | producer-1] - [Producer clientId=producer-1] Error connecting to node kafka-2.kafka.myproject.svc.cluster.local:9092 (id: 1003 rack: null)
java.io.IOException: Can't resolve address: kafka-2.kafka.myproject.svc.cluster.local:9092
	at org.apache.kafka.common.network.Selector.doConnect(Selector.java:258)
	at org.apache.kafka.common.network.Selector.connect(Selector.java:237)
	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:792)
	at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:230)
	at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:263)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
	at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.nio.channels.UnresolvedAddressException
	at java.base/sun.nio.ch.Net.checkAddress(Net.java:112)
	at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
	at org.apache.kafka.common.network.Selector.doConnect(Selector.java:256)
	... 7 more


Using new KafkaProducer configuration and implementing Redirector interface resolves the problem:

Code Block
languagejava
broker.redirector=custom.redirector.BrokerRedirector
Code Block
languagejava
public class BrokerRedirector implements Redirector {
	@Override
	public void configure(Map<String, ?> props) {}

	@Override
	public InetSocketAddress redirect(InetSocketAddress address) {
		String host = address.getHostString();
		if (host.contains("kafka-0"))
			return new InetSocketAddress("localhost", 9092);
		else if (host.contains("kafka-1"))
			return new InetSocketAddress("localhost", 9093);
		else if (host.contains("kafka-2"))
			return new InetSocketAddress("localhost", 9094);
		else if (host.contains("kafka-3"))
			return new InetSocketAddress("localhost", 9095);
		else if (host.contains("kafka-4"))
			return new InetSocketAddress("localhost", 9096);		
		return address;
	}
}


Compatibility, Deprecation, and Migration Plan

...