...
package org.apache.kafka.server;
/**
* An interface for enforcing a policy on the records that are accepted by this
* broker.
*
* A common use case is for validating that records contain the correct schema
* information.
*
* If <code>record.validation.policy.class.name</code> is defined, Kafka will
* create an instance of the specified class using the default constructor and
* will then pass the broker configs to its <code>configure()</code> method.
* During broker shutdown, the <code>close()</code> method will be invoked so
* that resources can be released (if necessary).
*/
public interface RecordValidationPolicy extends Configurable, Closeable {
/**
* TopicMetadata describes the topic-partition a record is being produced to.
*/
public interface TopicMetadata {
TopicPartition topicPartition();
/**
* @return the value of the topic config <code>"validation.policy"</code>
*/
String validationPolicy();
}
/**
* HeaderProxy allows read-only access to the data in a record header
*/
publicinterfaceHeaderProxy {
Stringkey();
/**
* @return a read-only view on the header value
*/
ByteBuffervalue();
}
/**
...
*
...
RecordProxy allows read-only access to the parts of a record that can be inspected by a validation policy.
*
* For efficiency, only the data required by the policy {@linkplain}RecordIntrospectionHints
...
*
...
are
...
guaranteed
...
to
...
be
...
available
...
to
...
the
...
policy.
...
*/
...
public
...
interface
...
RecordProxy
...
{
/**
* @return a read-only list of header data
*/
List<HeaderProxy> headers();
/**
* @return a read-only view on the record key
*/
ByteBufferkey();
/**
* @return a read-only view on the record value
*/
ByteBuffervalue();
}
/**
* @throwsInvalidRecordException when this policy rejects a record
*/
voidvalidate(TopicMetadatatopic, RecordProxyrecord) throwsInvalidRecordException;
/**
* The parts of the record that a policy needs to inspect. This is used for
* iteration when using compression, so as to only decompress the minimum data
* necessary to perform the validation
*/
publicinterfaceRecordIntrospectionHints {
/**
* @return whether the policy will need to access a record's headers
*/
booleanaccessHeaders();
/**
* @return minimum number of bytes from the beginning of the record's key byte[]
* @return 0 key is not needed for policy
* @return -1 or Long.MAX_LONG for all
*/
longaccessKeyBytes();
/**
* @return minimum number of bytes from the beginning of the record's value byte[]
* @return 0 key is not needed for policy
* @return -1 or Long.MAX_LONG for all
*/
longaccessValueBytes();
}
/**
* @return hints describing the parts of the record that this policy
* needs to inspect.
*/
RecordIntrospectionHintsgetHints();
}
New Broker config
record.validation.policy.class.name
...