This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The purpose of the change is to accommodate environments without proper DNS support, when KafkaProducer or KafkaConsumer tries to connect to Kafka brokers that are inside another network.
Currently, Kafka client connection fails with "java.io.IOException: Can't resolve address" after Kafka cluster metadata is updated with internal DNS names of brokers, unreachable by Client. The example of such configuration might be a Java Client calling Kafka Cluster from outside of Kubernetes cluster or AWS network.
The KIP improves NetworkClient to redirect the call to alternative network address and provides developers with additional Configurable interface to specify redirection rules.
Public Interfaces
Introduce new org.apache.kafka.common.network.Redirector interface:
package org.apache.kafka.common.network;
import java.net.InetSocketAddress;
import org.apache.kafka.common.Configurable;
public interface Redirector extends Configurable {
public InetSocketAddress redirect(InetSocketAddress address);
}
Change java.org.apache.kafka.common.network.Selector class to accept Redirector instance through constructor and execute it before opening the connection to broker:
@@ -121,6 +121,7 @@ public class Selector implements Selectable, AutoCloseable {
private final IdleExpiryManager idleExpiryManager;
private final MemoryPool memoryPool;
private final long lowMemThreshold;
+ private final Redirector redirector;
//indicates if the previous call to poll was able to make progress in reading already-buffered data.
//this is used to prevent tight loops when memory is not available to read any more data
private boolean madeReadProgressLastPoll = true;
@@ -147,7 +148,8 @@ public class Selector implements Selectable, AutoCloseable {
boolean recordTimePerConnection,
ChannelBuilder channelBuilder,
MemoryPool memoryPool,
- LogContext logContext) {
+ LogContext logContext,
+ Redirector redirector) {
try {
this.nioSelector = java.nio.channels.Selector.open();
} catch (IOException e) {
@@ -174,6 +176,21 @@ public class Selector implements Selectable, AutoCloseable {
this.memoryPool = memoryPool;
this.lowMemThreshold = (long) (0.1 * this.memoryPool.size());
this.log = logContext.logger(Selector.class);
+ this.redirector = redirector;
+ }
+
+ public Selector(int maxReceiveSize,
+ long connectionMaxIdleMs,
+ Metrics metrics,
+ Time time,
+ String metricGrpPrefix,
+ Map<String, String> metricTags,
+ boolean metricsPerConnection,
+ boolean recordTimePerConnection,
+ ChannelBuilder channelBuilder,
+ MemoryPool memoryPool,
+ LogContext logContext) {
+ this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, recordTimePerConnection, channelBuilder, memoryPool, logContext, null);
}
public Selector(int maxReceiveSize,
@@ -188,10 +205,15 @@ public class Selector implements Selectable, AutoCloseable {
this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder, MemoryPool.NONE, logContext);
}
+ public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext, Redirector redirector) {
+ this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.<String, String>emptyMap(), true, false, channelBuilder, MemoryPool.NONE, logContext, redirector);
+ }
+
public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) {
this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.<String, String>emptyMap(), true, channelBuilder, logContext);
}
+
/**
* Begin connecting to the given address and add the connection to this nioSelector associated with the given id
* number.
@@ -211,6 +233,7 @@ public class Selector implements Selectable, AutoCloseable {
SocketChannel socketChannel = SocketChannel.open();
try {
configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
+ address = (redirector != null) ? redirector.redirect(address) : address;
boolean connected = doConnect(socketChannel, address);
SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
Add Redirector configuration support for KafkaProducer and KafkaConsumer as broker.redirector property:
@@ -211,6 +211,10 @@ public class ConsumerConfig extends AbstractConfig {
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";
+ /** <code>broker.redirector</code> */
+ public static final String BROKER_REDIRECTOR_CLASS_CONFIG = "broker.redirector";
+ public static final String BROKER_REDIRECTOR_CLASS_DOC = "Broker redirector class that implements the <code>org.apache.kafka.common.network.Redirector</code> interface.";
+
/** <code>connections.max.idle.ms</code> */
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
@@ -397,6 +401,11 @@ public class ConsumerConfig extends AbstractConfig {
Type.CLASS,
Importance.HIGH,
VALUE_DESERIALIZER_CLASS_DOC)
+ .define(BROKER_REDIRECTOR_CLASS_CONFIG,
+ Type.CLASS,
+ null,
+ Importance.LOW,
+ BROKER_REDIRECTOR_CLASS_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
305000, // chosen to be higher than the default of max.poll.interval.ms
@@ -44,6 +44,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Redirector;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.requests.MetadataRequest;
@@ -715,8 +716,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+ Redirector brokerRedirector = config.getConfiguredInstance(ConsumerConfig.BROKER_REDIRECTOR_CLASS_CONFIG, Redirector.class);
NetworkClient netClient = new NetworkClient(
- new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
+ new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext, brokerRedirector),
this.metadata,
clientId,
100, // a fixed large enough value will suffice for max in-flight requests
@@ -181,6 +181,10 @@ public class ProducerConfig extends AbstractConfig {
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";
+ /** <code>broker.redirector</code> */
+ public static final String BROKER_REDIRECTOR_CLASS_CONFIG = "broker.redirector";
+ public static final String BROKER_REDIRECTOR_CLASS_DOC = "Broker redirector class that implements the <code>org.apache.kafka.common.network.Redirector</code> interface.";
+
/** <code>connections.max.idle.ms</code> */
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
@@ -292,6 +296,11 @@ public class ProducerConfig extends AbstractConfig {
Type.CLASS,
Importance.HIGH,
VALUE_SERIALIZER_CLASS_DOC)
+ .define(BROKER_REDIRECTOR_CLASS_CONFIG,
+ Type.CLASS,
+ null,
+ Importance.LOW,
+ BROKER_REDIRECTOR_CLASS_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
Type.LONG,
@@ -412,9 +413,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
+ Redirector brokerRedirector = config.getConfiguredInstance(ProducerConfig.BROKER_REDIRECTOR_CLASS_CONFIG, Redirector.class);
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
- this.metrics, time, "producer", channelBuilder, logContext),
+ this.metrics, time, "producer", channelBuilder, logContext, brokerRedirector),
this.metadata,
clientId,
maxInflightRequests,
A public interface is any change to the following:
Binary log format
The network protocol and api behavior
Any class in the public packages under clientsConfiguration, especially client configuration
org/apache/kafka/common/serialization
org/apache/kafka/common
org/apache/kafka/common/errors
org/apache/kafka/clients/producer
org/apache/kafka/clients/consumer (eventually, once stable)
Monitoring
Command line tools and arguments
- Anything else that will likely break existing users in some way when they upgrade
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.