Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


package org.apache.kafka.server;

/**
* An interface for enforcing a policy on the records that are accepted by this
* broker.
*
* A common use case is for validating that records contain the correct schema
* information.
*
* If <code>record.validation.policy.class.name</code> is defined, Kafka will
* create an instance of the specified class using the default constructor and
* will then pass the broker configs to its <code>configure()</code> method.
* During broker shutdown, the <code>close()</code> method will be invoked so
* that resources can be released (if necessary).
*/
public interface RecordValidationPolicy extends Configurable, Closeable {

/**
* TopicMetadata describes the topic-partition a record is being produced to.
*/
public interface TopicMetadata {
TopicPartition topicPartition();

/**
* @return the value of the topic config <code>"validation.policy"</code>
*/
String validationPolicy();
}

/**
* HeaderProxy allows read-only access to the data in a record header
*/
publicinterfaceHeaderProxy {
  Stringkey();


  /**
  * @return a read-only view on the header value
  */
  ByteBuffervalue();
}

/**

...

*

...

RecordProxy allows read-only access to the parts of a record that can be inspected by a validation policy.
*
* For efficiency, only the data required by the policy {@linkplain}RecordIntrospectionHints

...

*

...

are

...

guaranteed

...

to

...

be

...

available

...

to

...

the

...

policy.

...

*/

...

public

...

interface

...

RecordProxy

...

{
  /**
  * @return a read-only list of header data
  */
  List<HeaderProxy> headers();


  /**
  * @return a read-only view on the record key
  */
  ByteBufferkey();


  /**
  * @return a read-only view on the record value
  */
  ByteBuffervalue();
}

/**
* @throwsInvalidRecordException when this policy rejects a record
*/
voidvalidate(TopicMetadatatopic, RecordProxyrecord) throwsInvalidRecordException;

/**
* The parts of the record that a policy needs to inspect. This is used for
* iteration when using compression, so as to only decompress the minimum data
* necessary to perform the validation
*/
publicinterfaceRecordIntrospectionHints {
/**
  * @return whether the policy will need to access a record's headers
  */
booleanaccessHeaders();

/**
* @return minimum number of bytes from the beginning of the record's key byte[]
* @return 0 key is not needed for policy
* @return -1 or Long.MAX_LONG for all
*/
longaccessKeyBytes();

/**
* @return minimum number of bytes from the beginning of the record's value byte[]
* @return 0 key is not needed for policy
* @return -1 or Long.MAX_LONG for all
*/
longaccessValueBytes();
}

/**
* @return hints describing the parts of the record that this policy
* needs to inspect.
*/
RecordIntrospectionHintsgetHints();
}

New Broker config

record.validation.policy.class.name

...