Table of Contents |
---|
Status
Current state: [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: KAFKA-14700
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * ProduceRequestInterceptors can be defined to perform custom, light-weight processing on every record received by a * broker. * * Broker-side interceptors should be used with caution: * 1. Processing messages that were sent in compressed format by the producer will need to be decompressed and then * re-compressed to perform broker-side processing * 2. Performing unduly long or complex computations can negatively impact overall cluster health and performance * * Potential use cases: * - Schema validation * - Privacy enforcement * - Decoupling server-side and client-side serialization */ public abstract class ProduceRequestInterceptor { // Custom function for mutating the original message. If the method returns a ProduceRequestInterceptorSkipRecordException, // the record will be removed from the batch and won't be persisted in the target log. All other exceptions are // considered "fatal" and will result in a request error public abstract ProduceRequestInterceptorResult processRecord(byte[] key, byte[] value, String topic, int partition, Header[] headers) throws Exception; // Define the topic name pattern that will determine whether this interceptor runs on a given batch of records public abstract Pattern interceptorTopicPattern(); // Method that gets called during the interceptor's initialization to configure itself public abstract void configure(); } |
The main new interface required for produce-time interceptors is the abstract ProduceRequestInterceptor
class. User-defined implementation implementations of this class would serve as the a container for the lightweight server-side logic that would get executed for every record received by a brokerrecord pre-processing. The class' processRecord
method enables three basic types of operations:
...
Once a user-defined implementation of ProduceRequestInterceptor
is compiled and added to the classpath of the Kafka hostruntime, it could be registered as an interceptor with the help of three new broker-side config options:
...