THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Public Interfaces
New public interface interfaces RecordsPolicy
, Record
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.server.policy; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.errors.PolicyViolationException; import org.apache.kafka.common.Record; /** * <p>An interface for enforcing a policy on recordsmessage in topics. * * <p>Common use case is requiring that the messages has desired format. * * <p>If <code>message.policy.class.name</code> is defined, Kafka will create an instance of the specified class * for each topic using the default constructor and will then pass the broker configs to its <code>configure()</code> method. During * broker shutdown, the <code>close()</code> method will be invoked so that resources can be released (if necessary). */ public interface RecordsPolicy extends Configurable, AutoCloseable { /** * Validate the records and throw a <code>PolicyViolationException</code> with a suitable error * message if the records for the provided topic do not satisfy this policy. * * Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation * failure only affects the relevant topic, other topics in the request will still be processed. * * @param records Records to validate. * @throws PolicyViolationException if the records do not satisfy this policy. */ void validate(String topic, RecordsIterable<? extends Record> records) throws PolicyViolationException; } package org.apache.kafka.common; import java.nio.ByteBuffer; import org.apache.kafka.common.header.Header; import org.apache.kafka.server.policy.RecordsPolicy; /** * A log record is a tuple consisting of a unique offset in the log, a sequence number assigned by * the producer, a timestamp, a key and a value. * * @see RecordsPolicy */ public interface Record { /** * Check whether this record has a key * @return true if there is a key, false otherwise */ boolean hasKey(); /** * Get the record's key. * @return the key or null if there is none */ ByteBuffer key(); /** * Check whether a value is present (i.e. if the value is not null) * @return true if so, false otherwise */ boolean hasValue(); /** * Get the record's value * @return the (nullable) value */ ByteBuffer value(); /** * Get the headers. For magic versions 1 and below, this always returns an empty array. * * @return the array of headers */ Header[] headers(); } |
Proposed Changes
I propose to introduce the
- Record - new public interface
RecordsPolicy
.. Expose information about Records to be added to the topic. RecordsPolicy
- new public interface. Implementation of this interface contains specific logic to check records to be added to the topic.- Configuration option -
records.policy.class.name: String -
Configuration option - sets class name of the implementation ofRecordsPolicy
for the specific topic. - Configuration option -
records.policy.enabled: Boolean
- Configuration option - enable or disable records policy for the topic
...