You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion

Discussion thread:


JIRA: KAFKA-???

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

During the adoption of Kafka in large enterprises, it's important to guarantee data in some topic conforms to the specific format.


When data are written and read by the different applications developed by the different teams it's hard to guarantee data format using only custom SerDe, because malicious applications can use different SerDe.

The data format can be enforced only on the broker side.

Public Interfaces

New public interface RecordsPolicy 


RecordsPolicy.java
/**
 * <p>An interface for enforcing a policy on records in topics.
 *
 * <p>Common use case is requiring that the messages has desired format.
 *
 * <p>If <code>message.policy.class.name</code> is defined, Kafka will create an instance of the specified class
 * for each topic using the default constructor and will then pass the broker configs to its <code>configure()</code> method. During
 * broker shutdown, the <code>close()</code> method will be invoked so that resources can be released (if necessary).
 */
public interface RecordsPolicy extends Configurable, AutoCloseable {
    /**
     * Validate the records and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the records for the provided topic do not satisfy this policy.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation
     * failure only affects the relevant topic, other topics in the request will still be processed.
     *
     * @param records Records to validate.
     * @throws PolicyViolationException if the records do not satisfy this policy.
     */
    void validate(String topic, Records records) throws PolicyViolationException;
}

Proposed Changes

I propose to introduce the

  • new public interface RecordsPolicy .
  • Configuration option - records.policy.class.name: String - sets class name of the implementation of RecordsPolicy for the specific topic.
  • Configuration option - records.policy.enabled: Boolean  - enable or disable records policy for the topic

Compatibility, Deprecation, and Migration Plan

There are no compatibility issues or deprecation.

No migration required.

Rejected Alternatives


  • No labels