Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:   [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-14700

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Another popular pattern for enforcing platform standards and distributing features to all clients is through the deployment of what I refer to as (special-purpose) platform apps. Think of the stream processing application that performs some sort of stateless transformation on every topic message, and writes the result back to Kafka. In many instances, this architecture might be warranted (e.g. if the computations are expensive for instanceor the logic is complex). But in a lot of cases, these systems emerged in response to as a way to work around the library extension pattern’s limitations since it allows and allow platform owners to save everyone the headache deploy features at scale without managing the lifecycle of library upgradesextensions. Unfortunately, using platform apps to enforce standards is an expensive strategy with rather poor scalability features. For instance, if a company decides to solve privacy enforcement through a platform app that redacts every Kafka message in-flight, the system will double disk space, the number of partitions and require a large amount of additional compute.

...

This KIP wants to provide a third way for platform owners: the ability to extend Kafka’s server-side behavior through broker interceptors. At their core, broker interceptors can be thought of as very lightweight, stateless stream processors that can intercept, mutate and mutate filter messages either at produce or consume time.

...

To date, Kafka hasn't released a comparable feature to OSS users (though Confluent server does support server-side schema validation). This KIP is proposing , and this KIP wants to to change that through the addition of broker interceptors to the stack.

...

Of course, this design isn’t without its challenges and pitfalls (primarily concerning performance). These challenges will be discussed in more detail in the next sectionWhen defining interceptors, operators would need to exercise good judgement to avoid causing cluster-wide performance degradation.

Prior art

...

Code Block
languagejava
titleProduceRequestInterceptor interface
/**
 * ProduceRequestInterceptors can be defined to perform custom, light-weight processing on every record received by a
 * broker.
 *
 * Broker-side interceptors should be used with caution:
 *  1. Processing messages that were sent in compressed format by the producer will need to be decompressed and then
 *    re-compressed to perform broker-side processing
 *  2. Performing unduly long or complex computations can negatively impact overall cluster health and performance
 *
 * Potential use cases:
 *   - Schema validation
 *   - Privacy enforcement
 *   - Decoupling server-side and client-side serialization
 */
public abstract class ProduceRequestInterceptor {
    // Custom function for mutating the original message. If the method returns a ProduceRequestInterceptorSkipRecordException,
    // the record will be removed from the batch and won't be persisted in the target log. All other exceptions are
    // considered "fatal" and will result in a request error
    public abstract ProduceRequestInterceptorResult processRecord(byte[] key, byte[] value, String topic, int partition, Header[] headers) throws Exception;

    // Define the topic name pattern that will determine whether this interceptor runs on a given batch of records
    public abstract Pattern interceptorTopicPattern();

    // Method that gets called during the interceptor's initialization to configure itself
    public abstract void configure();
}

The main new interface required for produce-time interceptors is the abstract ProduceRequestInterceptor class. User-defined implementation implementations of this class would serve as the a container for the lightweight server-side logic that would get executed for every record received by a brokerrecord pre-processing. The class' processRecord method enables three basic types of operations:

...

Once a user-defined implementation of ProduceRequestInterceptor is compiled and added to the classpath of the Kafka hostruntime, it could be registered as an interceptor with the help of three new broker-side config options:

...