Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: added KIP links

...

There are already a number of KIPS describing similar function. In our opinion, this suggests there is general interest in providing these capabilities. We have incorporated elements (and feedback) from these other KIPS:

KIP-686: API to ensure Record policy on the broker (Discussion stopped in 2020) - has similar intent, but proposed using PolicyViolationException for rejecting records, while we suggest reusing the InvalidRecordException from KIP-467 which maps to an error handled by the Producer. In addition, KIP-686 does not describe how it would handle compression.

KIP-729: Custom validation of records on the broker prior to append (2021)- has similar intent though it states that it will not work with compression - which is addressed in our KIP.

KIP-905: Broker interceptors (currently active) - is much broader in scope as it covers use-cases that include both mutating messages, and also intercepting client fetch requests. Our KIP can be viewed as a more narrowly focused subset of the function described in KIP-905, targeted at improving the experience of using schemas with Kafka.

KIP-467: Augment ProducerResponse error messaging for specific culprit records is also of note as it provides the foundation of the broker validation logic, and client external behavior, that this KIP build upon. This KIP-467 is already implemented, with the exception of the client retries functionality.

...


package org.apache.kafka.server;

/**
* An interface for enforcing a policy on the records that are accepted by this
* broker.
*
* A common use case is for validating that records contain the correct schema
* information.
*
* If <code>record.validation.policy.class.name</code> is defined, Kafka will
* create an instance of the specified class using the default constructor and
* will then pass the broker configs to its <code>configure()</code> method.
* During broker shutdown, the <code>close()</code> method will be invoked so
* that resources can be released (if necessary).
*/
public interface RecordValidationPolicy extends Configurable, Closeable {

/**
* TopicMetadata describes the topic-partition a record is being produced to.
*/
public interface TopicMetadata {
TopicPartition getTopicPartition();

/**
* @return the value of the topic config <code>"validation.policy"</code>
*/
String validationPolicy();
}

/**
* RecordProxy contains the parts of a record that can be inspected by a validation policy.
*
* For efficiency, only the data required by the policy {@linkplain}RecordIntrospectionHints
* are guaranteed to be available to the policy.
*/
public interface RecordProxy {
Header[] getHeaders();
ByteBuffer getKey();
ByteBuffer getValue();
}

/**
* @throws InvalidRecordException when this policy rejects a record
*/
void validate(TopicMetadata topic, RecordProxy record) throws InvalidRecordException;

/**
* The parts of the record that a policy needs to inspect. This is used for
* iteration when using compression, so as to only decompress the minimum data
* necessary to perform the validation
*/
public interface RecordIntrospectionHints {
/**
* @return whether the policy will need to access a record's headers
*/
boolean needHeaders();

/**
* @return minimum number of bytes from the beginning of the record's key byte[]
* @return 0 key is not needed for policy
* @return -1 otor Long.MAX_LONG for all
*/
long needKeyBytes();

/**
* @return minimum number of bytes from the beginning of the record's value byte[]
* @return 0 key is not needed for policy
* @return -1 otor Long.MAX_LONG for all
*/
long needValueBytes();
}

/**
* @return hints describing the parts of the record that this policy
* needs to inspect.
*/
RecordIntrospectionHints getHints();
}

...

This KIP proposes extending the validation to include an optional call to a RecordValidationPolicy implementation. This call is only made if the broker is configured with a record validation policy class, and the topic the record is being produced to has a non-null record.validation.policy config. Consistently with KIP-467, if the policy rejects a record then the entire batch of records will be rejected.

...

  • We discarded the idea that the validation policy plugin class could be defined per-topic, as a single per-broker policy could act as a facade to different policies specific to topics (e.g. different registries)

  • We discarded the idea to add an optional filed (eg schema id) to the kafka protocol Produce API, as that would require new clients not just serdes. Moreover, KIP-467 makes this approach look unnecessary.

  • It remains possible to implement a Kafka Streams functionality that filters a topic that clients write to, into another topic with validated messages (and maybe a “dead letter” topic for invalid ones). Such an alternative approach doesn’t provide any direct feedback to the client producer in the response. It also doesn’t require the broker to execute third party code semantically coupled with the clients, at the price of having an extra “moving part” (the Streams app) which contains such logic. Moreover, the topic-to-schema mapping must map both input topic and destination topic to the same schema.

...