Status
Current state: Under Discussion
Discussion thread:
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Today, Kafka metrics are only collected for individual clients or brokers. This makes it difficult for users to trace the path of individual messages across the cluster, providing a complete end-to-end picture of system performance and behavior. Technically, it is possible to measure end-to-end performance today by modifying users applications to collect and track additional information, but that isn't always practical for critical infrastructure applications. The ability to quickly deploy tools to observe, measure, and monitor Kafka client behavior, down to the message level, is valuable in production environments. At the same time, metrics might need contextual metadata that may vary across applications. The ability to measure and monitor clients without writing new code or recompiling applications is essential. (In some cases, it might help to connect to running applications.)
To enable this functionality, we would like to add producer and consumer interceptors that can intercept messages at different points on producer and consumer. The mechanism that we are proposing is inspired by the interceptor interface in Apache Flume. While there are potentially many ways to use an interceptor interface (for example, detecting anomalies, encrypting data, filtering fields), each of them would require a careful evaluation of whether or not it should be done with interceptor or with another mechanism. It is better to add the related APIs when there is a clear motivation for those use cases. Thus, we are proposing minimal producer and consumer interceptor interfaces that are designed to support only measurement and monitoring.
While it is possible to add more metrics or improve monitoring in Kafka, we believe that creating a flexible, customizable interface is beneficial for the following reasons:
Common monitoring tools. In a large company, different teams collaborate on building systems. Often, different teams develop and deploy different components over time. In addition, organizations want to standardize on common metrics, formats, and data collection systems. We think it is valuable for an organization to develop and deploy common Kafka client monitoring tools and deploy these across all applications that use Kafka.
Monitoring can be expensive. Adding additional metrics to Kafka might compromise performance. (For example see this JIRA ticket for an example of a performance regression caused by just checking timestamps.) Unfortunately, there is sometimes a tradeoff between system performance and data collection. As an example, consider the problem of measuring message sizes. The cheapest, simplest, and most straightforward approach is to measure average values. Calculating percentiles on a distributed system is more expensive and complicated than calculating simple averages, but would be useful in many applications. We would like to give users the ability to adopt different algorithms for metric collection, or to choose not to collect metrics at all.
Different applications require different metrics. For example, a user might find it important to monitor the cardinality of different keys in Kafka messages. It would be impractical for Kafka to provide all possible metrics internally; a pluggable intercept system provides a simple way to develop customized metrics.
Kafka is often a part of a bigger infrastructure in an organization, and it would be very useful to enable end-to-end tracing in that infrastructure. Consider LinkedIn’s use of Samza to trace frontend user calls across all services by tagging each call with a unique value, called TreeId, and propagating that value across all subsequent service calls. Interceptors will allow tracing of Kafka clients through the same infrastructure, tracing with the same TreeId stored in a message.
In this KIP, we propose adding two new interfaces: ProducerInterceptor on producer and ConsumerInterceptor on consumer. User will be able to implement and configure a chain of custom interceptors and listen to events that happen to a record at different points on producer and consumer. Each of the APIs will provide read-only access to a message. (Manipulating records is out-of-scope for this KIP. Interceptor interfaces can be later extended to allow manipulation of records and more intercept points, by proposing new KIPs. See the rejected alternatives section.)
Public Interfaces
We add two new interfaces: ProducerInterceptor interface that will allow plugging in classes that will be notified of events happening to the record during its lifetime on the producer; and ConsumerInterceptor interface that will allow plugging in classes that will be notified of record events on the consumer.
Both ProducerInterceptor and ConsumerInterceptor inherit from Configurable. Properties passed to configure() method will be consumer/producer config properties (including clientId if it was not specified in the config and assigned by KafkaProducer/KafkaConsumer). We will document in the Producer/ConsumerInterceptor class description that they will be sharing producer/consumer config namespace possibly with many other interceptors and serializers. So, it could be useful to use a prefix to prevent conflicts.
Add a new configuration setting interceptor.classes to the KafkaProducer API which sets a list of classes to use as producer interceptors. Each specified class must implement ProducerInterceptor interface. The default configuration will have an empty list.
Add a new configuration setting interceptor.classes to the KafkaConsumer API which sets a list of classes to use as consumer interceptors. Each specified class must implement ConsumerInterceptor interface. The default configuration will have an empty list.
Here is more detailed description of new interfaces:
ProducerInterceptor interface
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | /** * A plugin interface to allow things to intercept events happening to a producer record, * such as sending producer record or getting an acknowledgement when a record gets published */ public interface ProducerInterceptor<K, V> extends Configurable { /** * This is called when client sends record to KafkaProducer, before key and value gets serialized. * @param record the record from client */ public void onSend(ProducerRecord<K, V> record); /** * This is called just after KafkaProducer assigns partition (if needed) and serializes key and value. * @param tp topic/partition to send record to * @param record the record from client * @param serializedKeyValue serialized key and value */ public void onEnqueued(TopicPartition tp, ProducerRecord<K, V> record, SerializedKeyValue serializedKeyValue); /** * This is called when the send has been acknowledged * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error occurred. * @param exception The exception thrown during processing of this record. Null if no error occurred. */ public void onAcknowledgement(RecordMetadata metadata, Exception exception); /** * This is called when interceptor is closed */ public void close(); } |
onSend() will be called in KafkaProducer.send(), before key and value gets serialized and before partition gets assigned.
onEnqueued() will be called in KafkaProducer::send() after partition is assigned and after key and value gets serialized.
onAcknowledgement() will be called when the send is acknowledged. It has same API as Callback.onCompeliton(), and is called just before Callback.onCompletion() is called.
ProducerInterceptor APIs will be called from multiple threads: onSend() and onEnqueued() will be called on submitting thread and onAcknowledgement() will be called on producer I/O thread. It is up to the interceptor implementation to ensure thread safety. Since onAcknowledgement() is called on producer I/O thread, onAcknowledgement() implementation should be reasonably fast, or otherwise sending of messages from other threads could be delayed.
ConsumerInterceptor interface
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | /** * A plugin interface to allow things to intercept Consumer events such as receiving a record or record being consumed * by a client. */ public interface ConsumerInterceptor<K, V> extends Configurable { /** * This is called on fetch of a record, before key and value gets de-serialized * @param tp topic/partition of a record * @param serializedKeyValue serialized key and value fetched from broker */ public void onReceive(TopicPartition tp, SerializedKeyValue serializedKeyValue); /** * This is called when the records are about to be returned to the client. * @param records records to be consumed by the client. Null if record dropped/ignored/discarded (non consumable) */ public void onConsume(ConsumerRecords<K, V> records); /** * This is called when offsets get committed * This method will be called when the commit request sent to the server has been acknowledged. * @param offsets A map of the offsets and associated metadata that this callback applies to */ public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets); /** * This is called when interceptor is closed */ public void close(); } |
onReceive() will be called on fetch of a record: in Fetcher.parseRecord() just before key and value gets de-serialized.
onConsume() will be called in KafkaConsumer.poll(), just before poll() returns ConsumerRecords.
onCommit() will be called when offsets get committed: just before OffsetCommitCallback.onCompletion() is called and in ConsumerCoordinator.commitOffsetsSync() on successful commit.
Since new consumer is single-threaded, ConsumerInterceptor API will be called from a single thread. Since interceptor callbacks are called for every record, the interceptor implementation should be careful about adding performance overhead to consumer.
Proposed Changes
We propose to add two new interfaces listed and described in the Public Interfaces section: ProducerInterceptor and ConsumerInterceptor. We will allow a chain of interceptors. It is up to the user to correctly specify the order of interceptors in producer.interceptor.classes and in consumer.interceptor.classes.
Kafka Producer changes:
We will create a new class that will encapsulate a list of ProducerInterceptor instances: ProducerInterceptors
1234567891011121314151617181920212223242526272829303132333435/**
* This class wraps custom interceptors configured for this producer.
*/
public
class
ProducerInterceptors<K, V>
implements
Closeable {
private
final
List<ProducerInterceptor<K,V>> interceptors;
public
ProducerInterceptors(List<ProducerInterceptor<K,V>> interceptors) {
this
.interceptors = interceptors;
}
public
void
onSend(ProducerRecord<K, V> record) {
for
(ProducerInterceptor interceptor:
this
.interceptors) {
interceptor.onSend(record);
}
}
public
void
onEnqueued(TopicPartition tp, ProducerRecord<K, V> record,
byte
[] serializedKey,
byte
[] serializedValue) {
for
(ProducerInterceptor interceptor:
this
.interceptors) {
interceptor.onEnqueued(tp, record, serializedKeyValue);
}
}
public
void
onAcknowledgement(RecordMetadata metadata, Exception e) {
for
(ProducerInterceptor interceptor:
this
.interceptors) {
interceptor.onAcknowledgement(metadata, e);
}
}
@Override
public
void
close() {
for
(ProducerInterceptor<K,V> interceptor:
this
.interceptors) {
interceptor.close();
}
}
}
- KafkaProducer will have a new member:
ProducerInterceptors<K, V> interceptors;
- KafkaConsumer constructor will load instances of interceptor classes specified in interceptor.classes. If interceptor.classes config does not list any interceptor classes, interceptors list will be empty. It will call configure() on each interceptor class, passing in ProducerConfig.originalsWithPrefix(ProducerInterceptor.prefix()) and clientId. KafkaConsumer constructor will instantiate 'interceptors' with a list of interceptor classes.
To be able to call interceptor on producer callback, we wrap client callback passed to KafkaProducer.send() method inside ProducerCallback – a new class that inherits Callback and will have a reference to client callback and 'interceptors'. ProducerCallback.onCompletion() implementation will call client's callback onCompletion (if client's callback is not null) and will call 'interceptors' onAcknowledgement().
ProducerCallback123456789101112131415161718/**
* This class is a callback called on every producer request complete.
*/
public
class
ProducerCallback<K, V>
implements
Callback {
private
final
Callback clientCallback;
private
final
ProducerInterceptors<K, V> interceptors;
public
ProducerCallback(Callback clientCallback, ProducerInterceptors<K, V> interceptors) {
this
.clientCallback = clientCallback;
this
.interceptors = interceptors;
}
public
void
onCompletion(RecordMetadata metadata, Exception e) {
interceptors.onAcknowledgement(metadata, e);
if
(clientCallback !=
null
)
clientCallback.onCompletion(metadata, e);
}
}
- KafkaProducer.send() will create ProducerCallback and call onSend() method.
producerCallback = new ProducerCallback(callback, this.interceptors);
interceptors.onSend(record); - In KafkaProducer.send(), after partition is assigned and keys and values are serialized, and before accumulator.append() call.
interceptors.onEnqueued(tp, record, serializedKey, serializedValue);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, producerCallback, remainingWaitMs); - KafkaProducer.close() will close interceptors:
ClientUtils.closeQuietly(interceptors, "producer interceptors", firstException);
Kafka Consumer changes:
We will create a new class that will encapsulate a list of ConsumerInterceptor instances: ConsumerInterceptors
123456789101112131415161718192021222324252627282930313233343536/**
* This class wraps custom interceptors configured for this consumer. On this callback, all consumer interceptors
* configured for the consumer are called.
*/
public
class
ConsumerInterceptors<K, V>
implements
Closeable {
private
final
List<ConsumerInterceptor<K,V>> interceptors;
public
ConsumerInterceptors(List<ConsumerInterceptor<K,V>> interceptors) {
this
.interceptors = interceptors;
}
public
void
onReceive(TopicPartition tp,
byte
[] serializedKey,
byte
[] serializedValue) {
for
(ConsumerInterceptor<K, V> interceptor:
this
.interceptors) {
interceptor.onReceive(tp, serializedKeyValue);
}
}
public
void
onConsume(ConsumerRecords<K, V> records) {
for
(ConsumerInterceptor<K,V> interceptor:
this
.interceptors) {
interceptor.onConsume(records);
}
}
public
void
onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
for
(ConsumerInterceptor<K,V> interceptor:
this
.interceptors) {
interceptor.onCommit(offsets);
}
}
@Override
public
void
close() {
for
(ConsumerInterceptor<K,V> interceptor:
this
.interceptors) {
interceptor.close();
}
}
}
- KafkaConsumer will have a new member
ConsumerInterceptors<K, V> interceptors;
- KafkaConsumer constructor will load instances of interceptor classes specified in interceptor.classes. If interceptor.classes config does not list any interceptor classes, interceptors list will be empty. It will call configure() on each interceptor class, passing in ConsumerConfig.originalsWithPrefix(ConsumerInterceptor.prefix()) and clientId. KafkaConsumer constructor will instantiate 'interceptors' with a list of interceptor classes.
- 'interceptors' will also be passed to Fetcher constructor, and Fetcher will have a reference to 'interceptors'.
- KafkaConsumer.close() will close 'interceptors':
ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException);
- KafkaConsumer.poll will call
this.interceptors.onConsume(consumerRecords);
- Fetcher.parseRecords will be modified as follows;
byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);interceptors.onReceive(partition, keyByteArray, valueByteArray);
K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray);V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(),valueByteArray);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, key, value);
...
Compatibility, Deprecation, and Migration Plan
It will not impact any of existing clients. When clients upgrade to new version, they do not need to add interceptor.classes config.
Rejected Alternatives
Alternative 1 – Interceptor interfaces that allow mutating the data
Interceptors could be potentially used for use-cases that require mutating the data. Examples include data encryption and getting all applications in an organization to include the same header and/or required fields. However, more evaluations needs to be done for these use-cases before proposing a API for them. It is better to add the related APIs when there is a clear motivation for those use cases. So, it is possible that interceptor interfaces will evolve to support mutating the data, but this is out of scope for this KIP.
Alternative 2 – Wrapper around KafkaProducer and KafkaConsumer.
Some monitoring can be done (such as using unique ID for end-to-end tracing) by using a wrapper around KafkaProducer and KafkaConsumer. he wrappers could catch the events at similar points as KafkaProducer.onSend() and onAcknowledgement() and KafkaConsumer.onConsume and onCommit:
- Requires changes in clients to use the wrappers to KafkaConsumer and KafkaProducer
- Will not be able to catch events at intermediate stages of a request lifetime in KafkaConsumer and KafkaProducer.