...
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/** * A plugin interface to allow things to intercept events happening to a producer record, * such as sending producer record or getting an acknowledgement when a record gets published */ public interface ProducerInterceptor<K, V> extends Configurable { /** * This is called when client sends record to KafkaProducer, before key and value gets serialized. * @param record the record from client * @return record that is either original record passed to this method or new record with modified key and value. */ public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); /** * This is called when the send has been acknowledged * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null The metadata information may be only partially filled, if an error occurred. Topic will be always *set, @param and if partition is not -1, partition will be set partition set/assigned to this record. * @param exception The exception thrown during processing of this record. Null if no error occurred. */ public void onAcknowledgement(RecordMetadata metadata, Exception exception); /** * This is called when interceptor is closed */ public void close(); } |
onSend() will be called in KafkaProducer.send(), before key and value gets serialized and before partition gets assigned. If the implementation modifies key and/or value, it must return modified key and value in a new ProducerRecord object. The implication of interceptors modifying a key in onSend() method is that partition will be assigned based on modified key, not the key from the client. If key/value transformation is not consistent (same key and value does not mutate to the same, but modified, key/value), then log compaction would not work. We will document this in ProducerInterceptor class. However, known use-cases, such as adding app name, host name to a message will do consistent transformation.
Another implication of onSend() returning ProducerRecord is that the interceptor can potentially modify topic/partition. It will be up to the interceptor that ProducerRecord returned from onSend() is correct (e.g. topic and partition, if given, are preserved or modified). KafkaProducer will use ProducerRecord returned from onSend() instead of record passed into KafkaProducer.send() method.
onAcknowledgement() will be called when the send is acknowledged. It has same API as Callback.onCompletion(), and is called just before Callback.onCompletion() is called. In addition, onAcknowledgement() will be called just before KafkaProducer.send() throws an exception (even when it does not call user callback).
ProducerInterceptor APIs will be called from multiple threads: onSend() will be called on submitting thread and onAcknowledgement() will be called on producer I/O thread. It is up to the interceptor implementation to ensure thread safety. Since onAcknowledgement() is called on producer I/O thread, onAcknowledgement() implementation should be reasonably fast, or otherwise sending of messages from other threads could be delayed.
ConsumerInterceptor interface
Since there may be multiple interceptors, the first interceptor will get a record from client passed as the 'record' parameter. The next interceptor in the list will get the record returned by the previous interceptor, and so on. Since interceptors are allowed to mutate records, interceptors may potentially get the record already modified by other interceptors. However, we will state in the javadoc that building a pipeline of mutable interceptors that depend on the output of the previous interceptors is discouraged, because of potential side-effects caused by interceptors potentially failing to mutate the record and throwing and exception. If one of the interceptors in the list throws an exception from onSend(), the exception is caught, logged,and the next interceptor is called with the record returned by the last successful interceptor in the list, or otherwise the client.
onAcknowledgement() will be called when the send is acknowledged. It has same API as Callback.onCompletion(), and is called just before Callback.onCompletion() is called. In addition, onAcknowledgement() will be called just before KafkaProducer.send() throws an exception (even when it does not call user callback). The difference in the behavior of ProducerInterceptor.onAcknowledgement() is that if an error occurred, metadata parameter will not be null. In this case, metadata will contain topic and possibly partition information (if available). If partition information is not available, then partition will be assigned -1.
ProducerInterceptor APIs will be called from multiple threads: onSend() will be called on submitting thread and onAcknowledgement() will be called on producer I/O thread. It is up to the interceptor implementation to ensure thread safety. Since onAcknowledgement() is called on producer I/O thread, onAcknowledgement() implementation should be reasonably fast, or otherwise sending of messages from other threads could be delayed.
ConsumerInterceptor interface
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/**
* A plugin interface to allow things to |
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/**
* A plugin interface to allow things to intercept Consumer events such as receiving a record or record being consumed
* by a client.
*/
public interface ConsumerInterceptor<K, V> extends Configurable {
/**
* This is called when the records are about to be returned to the client.
* @param records records to be consumed by the client. Null if record dropped/ignored/discarded (non consumable)
* @return records that is either original 'records' passed to this method or modified set of records
*/
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
/**
* This is called when offsets get committed
* This method will be called when the commit request sent to the server has been acknowledged.
* @param offsets A map of the offsets and associated metadata that this callback applies to
*/
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
/**
* This is called when interceptor is closed
*/
public void close();
} |
onCommit() will be called when offsets get committed: just before OffsetCommitCallback.onCompletion() is called and in ConsumerCoordinator.commitOffsetsSync() on successful commit.
Since new consumer is single-threaded, ConsumerInterceptor API will be called from a single thread. Since interceptor callbacks are called for every record, the interceptor implementation should be careful about adding performance overhead to consumer.
Add more record metadata to RecordMetadata and ConsumerRecord
Currently, RecordMetadata contains topic/partition, offset, and timestamp (KIP-32). We propose to add remaining record's metadata in RecordMetadata: checksum and record size. Both checksum and record size are useful for monitoring and audit. Checksum provides an easy way to get a summary of the message and is also useful for validating a message end-to-end. For symmetry, we also propose to expose the same metadata on consumer side and make available to interceptors.
We will add checksum and record size fields to RecordMetadata and ConsumerRecord.
...
language | java |
---|---|
firstline | 1 |
title | RecordMetadata |
linenumbers | true |
...
onCommit() will be called when offsets get committed: just before OffsetCommitCallback.onCompletion() is called and in ConsumerCoordinator.commitOffsetsSync() on successful commit.
Since new consumer is single-threaded, ConsumerInterceptor API will be called from a single thread. Since interceptor callbacks are called for every record, the interceptor implementation should be careful about adding performance overhead to consumer.
Add more record metadata to RecordMetadata and ConsumerRecord
Currently, RecordMetadata contains topic/partition, offset, and timestamp (KIP-32). We propose to add remaining record's metadata in RecordMetadata: checksum and record size. Both checksum and record size are useful for monitoring and audit. Checksum provides an easy way to get a summary of the message and is also useful for validating a message end-to-end. For symmetry, we also propose to expose the same metadata on consumer side and make available to interceptors.
We will add checksum and record size fields to RecordMetadata and ConsumerRecord.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
public final class ConsumerRecord<K, V>RecordMetadata { ....... private final long offset; private final TopicPartition topicPartition; private final long checksum; <<== NEW: checksum of the record private final int size; <<== NEW: record size in bytes (afterbefore decompression)compression) ....... |
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
public final class ConsumerRecord<K, V> {
.......
private final long checksum; <<== NEW: checksum of the record
private final int size; <<== NEW: record size in bytes (after decompression) |
We will make it clear in the documentation (of ConsumerRecord and onAknowledgement/onConsume) that checksum the consumer sees may not always be the one initially set on the producer. CRC may be overwritten by the broker during upgrade after message format change or in the case of topic We will make it clear in the documentation (of ConsumerRecord and onAknowledgement/onConsume) that checksum the consumer sees may not always be the one initially set on the producer. CRC may be overwritten by the broker during upgrade after message format change or in the case of topic config with timestamp type == LogAppendTime, which requires over-writing message timestamps in the message on the broker and as a result overwriting.
...
We will create a new class that will encapsulate a list of ProducerInterceptor instances: ProducerInterceptors
Code Block language java firstline 1 title ProducerInterceptors linenumbers true true /** * This class wraps custom interceptors configured for this producer. */ public class ProducerInterceptors<K, V> implements Closeable { private final List<ProducerInterceptor<K,V>> interceptors; public ProducerInterceptors(List<ProducerInterceptor<K,V>> interceptors) { this.interceptors = interceptors; } public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { ProducerRecord<K, V> interceptRecord = record; for (ProducerInterceptor interceptor: this.interceptors) { try { interceptRecord = interceptor.onSend(interceptRecord); } catch (Throwable t) { // do not propagate interceptor exception, ignore and continue calling other interceptors log.warn("Error executing interceptor onSend callback for topic: " + record.topic() + ", partition: " + record.partition(), t); } }/** * This class wraps custom interceptors configured for this producer. */ public class ProducerInterceptors<K, V> implements Closeable { private final List<ProducerInterceptor<K,V>> interceptors; public ProducerInterceptors(List<ProducerInterceptor<K,V>> interceptors) { this.interceptors = interceptorsreturn interceptRecord; } public ProducerRecord<K, V> onSend(ProducerRecord<K, V> recordvoid onAcknowledgement(RecordMetadata metadata, Exception e) { ProducerRecord<K, V> interceptRecord = record; for (ProducerInterceptor for (ProducerInterceptor<K, V> interceptor: this.interceptors) { try { interceptRecord = interceptor.onSend(interceptRecordonAcknowledgement(metadata, exception); } return interceptRecord; } public void onAcknowledgement(RecordMetadata metadata, Exception ecatch (Throwable t) { for (ProducerInterceptor interceptor: this.interceptors) { // do not propagate interceptor exceptions, just ignore log.warn("Error executing interceptor interceptor.onAcknowledgement(metadataonAcknowledgement callback", et); } } } @Override public void close() { for (ProducerInterceptor<K,V> interceptor: this.interceptors.interceptors) { try { interceptor.close(); } catch (Throwable t) { log.error("Failed to close producer interceptor interceptor.close();", t); } } } }
- KafkaProducer will have a new member:
ProducerInterceptors<K, V> interceptors;
- KafkaProducer constructor will load instances of interceptor classes specified in interceptor.classes. If interceptor.classes config does not list any interceptor classes, interceptors list will be empty. It will call configure() on each interceptor class, passing in ProducerConfig.originals(). KafkaProducerconstructor will instantiate 'interceptors' with a list of interceptor classes.
To be able to call interceptor on producer callback, we wrap client callback passed to KafkaProducer.send() method inside ProducerCallback – a new class that inherits Callback and will have a reference to client callback and 'interceptors'. ProducerCallback.onCompletion() implementation will call client's callback onCompletion (if client's callback is not null) and will call 'interceptors' onAcknowledgement().
123456789101112131415161718/**
* This class is a callback called on every producer request complete.
*/
public
class
ProducerCallback<K, V>
implements
Callback {
private
final
Callback clientCallback;
private
final
ProducerInterceptors<K, V> interceptors;
public
ProducerCallback(Callback clientCallback, ProducerInterceptors<K, V> interceptors) {
this
.clientCallback = clientCallback;
this
.interceptors = interceptors;
}
public
void
onCompletion(RecordMetadata metadata, Exception e) {
interceptors.onAcknowledgement(metadata, e);
if
(clientCallback !=
null
)
clientCallback.onCompletion(metadata, e);
}
}
KafkaProducer.send() will create ProducerCallback and call onSend() method.
producerCallback = new ProducerCallback(callback, this.interceptors);
ProducerRecord<K, V> sentRecord = interceptors.onSend(record);
- The rest of KafkaProducer.send() code will use sendRecord in place of 'record'.
- KafkaProducer.close() will close interceptors:
ClientUtils.closeQuietly(interceptors, "producer interceptors", firstException);
...
We will create a new class that will encapsulate a list of ConsumerInterceptor instances: ConsumerInterceptors
Code Block language java firstline 1 title ConsumerInterceptors linenumbers true /** * This class wraps custom interceptors configured for this consumer. On this callback, all consumer interceptors * configured for the consumer are called. */ public class ConsumerInterceptors<K, V> implements Closeable { private final List<ConsumerInterceptor<K,V>> interceptors; public ConsumerInterceptors(List<ConsumerInterceptor<K,V>> interceptors) { this.interceptors = interceptors; } public void onConsume(ConsumerRecords<K, V> records) { ConsumerRecords<K, V>, V> interceptRecords = records; for (ConsumerInterceptor<K,V> interceptor: this.interceptors) { try { interceptRecords = recordsinterceptor.onConsume(interceptRecords); } catch (Throwable t) { for (ConsumerInterceptor<K,V> interceptor: this.interceptors) { interceptRecords = interceptor.onConsume(interceptRecords); // do not propagate interceptor exception, ignore and continue calling other interceptors log.warn("Error executing interceptor onConsume callback", t); } } return interceptRecords; } public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { for (ConsumerInterceptor<K,V> interceptor: this.interceptors) { try { interceptor.onCommit(offsets); } catch (Throwable t) { // do not propagate interceptor.onCommit(offsets); exception, just ignore log.warn("Error executing interceptor onCommit callback", t); } } } @Override public void close() { for (ConsumerInterceptor<K,V> interceptor: this.interceptors) { try { interceptor.close(); } catch (Throwable t) { interceptor.close(); log.error("Failed to close consumer interceptor ", t); } } } }
- KafkaConsumer will have a new member
ConsumerInterceptors<K, V> interceptors;
- KafkaConsumer constructor will load instances of interceptor classes specified in interceptor.classes. If interceptor.classes config does not list any interceptor classes, interceptors list will be empty. It will call configure() on each interceptor class, passing in ConsumerConfig.originals() and clientId. KafkaConsumer constructor will instantiate 'interceptors' with a list of interceptor classes.
- KafkaConsumer.close() will close 'interceptors':
ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException);
- KafkaConsumer.poll will call
this.interceptors.onConsume(consumerRecords);
and return ConsumerRecords<K, V> returned from onConsume().
- ConsumerCoordinator.commitOffsetsAsync and commitOffsetsSync will call onCommit().
Compatibility, Deprecation, and Migration Plan
...