Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


package org.apache.kafka.server;

/**

...

*

...

An

...

interface

...

for

...

enforcing

...

a

...

policy

...

on

...

the

...

records

...

that

...

are

...

accepted

...

by

...

this

...

*

...

broker.

...

*

...


...

*

...

A

...

common

...

use

...

case

...

is

...

for

...

validating

...

that

...

records

...

contain

...

the

...

correct

...

schema

...

*

...

information.

...

*

...


...

*

...

If the broker config <code>record.validation.policy.class.

...

name</code>

...

is

...

defined,

...

Kafka

...

will

...

*

...

create

...

an

...

instance

...

of

...

the

...

specified

...

class

...

using

...

the

...

default

...

constructor

...

and

...

*

...

will

...

then

...

pass

...

the

...

broker

...

configs

...

to

...

its

...

<code>configure()</code>

...

method.

...

*

...

During

...

broker

...

shutdown,

...

the

...

<code>close()</code>

...

method

...

will

...

be

...

invoked

...

so

...

*

...

that

...

resources

...

can

...

be

...

released

...

(if

...

necessary).

...


*
* If the broker config <code>record.validation.policy.class.path</code> is defined,
* the RecordValidationPolicy implementation and its dependent libraries will be loaded
* by a dedicated classloader which searches this class path before the Kafka broker class path.
* The syntax of this string parameter that of a Java class path.

*/

public interface RecordValidationPolicy extends Configurable, Closeable {

/**
* TopicMetadata describes the topic-partition a record is being produced to.
*/
public interface TopicMetadata {
TopicPartition TopicIdPartition topicPartitiontopicIdPartition();

/**
* @return the value of the topic config <code>"record.validation.policy"</code>
*/
String validationPolicy();
}

/**
* HeaderProxy allows read-only access to the data in a record header
*/
publicinterfaceHeaderProxy {
  Stringkey();


  /**
  * @return a read-only view on the header value
  */
  ByteBuffervalue();
}

/**

...

*

...

RecordProxy allows read-only access to the parts of a record that can be inspected by a validation policy.
*
* For efficiency, only the data required by the policy {@linkplain}RecordIntrospectionHints

...

*

...

are

...

guaranteed

...

to

...

be

...

available

...

to

...

the

...

policy.

...

*/

...

public

...

interface

...

RecordProxy

...

{
  /**
  * @return a read-only list of header data
  */
  List<HeaderProxy> headers();


  /**
  * @return a read-only view on the record key
  */
  ByteBufferkey();


  /**
  * @return a read-only view on the record value
  */
  ByteBuffervalue();
}

/**
* @throwsInvalidRecordException when this policy rejects a record
*/
voidvalidate(TopicMetadatatopic, RecordProxyrecord) throwsInvalidRecordException;

/**
* The parts of the record that a policy needs to inspect. This is used for
* iteration when using compression, so as to only decompress the minimum data
* necessary to perform the validation
*/
publicinterfaceRecordIntrospectionHints {
/**
  * @return whether the policy will need to access a record's headers
  */
booleanaccessHeaders();

/**
* @return minimum number of bytes from the beginning of the record's key byte[]
* @return 0 key is not needed for policy
* @return -1 or Long.MAX_LONG for all
*/
longaccessKeyBytes();

/**
* @return minimum number of bytes from the beginning of the record's value byte[]
* @return 0 key is not needed for policy
* @return -1 or Long.MAX_LONG for all
*/
longaccessValueBytes();
}

/**
* @return hints describing the parts of the record that this policy
* needs to inspect.
*/
RecordIntrospectionHintsgetHints();
}

New Broker config

record.validation.policy.class.name

The fully qualified name of a class that implements a record validation policy. If no class name is defined then the defaultbehavior is to perform no validation on the records sent to the broker.

Type: string

Default: null

Valid values:

Importance: low

Update mode: read-only


record.validation.policy.class.path

An optional Java class path for the RecordValidationPolicy implementation. If specified, the RecordValidationPolicy implementation and its dependent libraries will be loaded by a dedicated classloader which searches this class path before the Kafka broker class path. The syntax of this string parameter that of a Java class path.

Type: string

Default: null

...

  • We discarded the idea that the validation policy plugin class could be defined per-topic, as a single per-broker policy could act as a facade to different policies specific to topics (e.g. different registries)

  • We discarded the idea to add an optional filed (eg schema id) to the kafka protocol Produce API, as that would require new clients not just serdes. Moreover, KIP-467 makes this approach look unnecessary.

  • It remains possible to implement a Kafka Streams functionality that filters a topic that clients write to, into another topic with validated messages (and maybe a “dead letter” topic for invalid ones). Such an alternative approach doesn’t provide any direct feedback to the client producer in the response. It also doesn’t require the broker to execute third party code semantically coupled with the clients, at the price of having an extra “moving part” (the Streams app) which contains such logic. Moreover, the topic-to-schema mapping must map both input topic and destination topic to the same schema.

  • We discarded having a boolean topic config to control per-topic enablement of the validation, when a policy is specified at the broker level, in favor of a string topic config whose value that can be consumed by the policy, as per the sample below.

Sample test policy

Here is a sample of a validation policy that we may use for the unit tests suite.

// Example RecordValidationPolicy implementation that checks record headers. Only records
// containing a header with a key that matches the 'record.validation.policy' property of
// the topic are accepted.
publicclassRequireHeaderValidationPolicyimplementsRecordValidationPolicy {


@Override
publicvoidconfigure(Map<String, ?> configs) {
  // This example doesn't use any of the broker's configuration properties.
}

@Override
publicvoidclose() throwsIOException {
  // This example doesn't need to perform any cleanup when the broker is shutdown.
}

@Override
publicvoidvalidate(TopicMetadatatopic, RecordProxyrecord) throwsInvalidRecordException {
  // Do any of the record headers have a key matching the 'record.validation.policy'
  // property of the topic being produced to?
  if (!record.headers().stream().filter(h->h.key().equals(topic.validationPolicy())).findFirst().isPresent()) {
  thrownewInvalidRecordException(
   String.format("Topic %s requires records to contain a header with key: %s",
   topic.topicIdPartition().topic(), topic.validationPolicy()));
  }
}

@Override
publicRecordIntrospectionHintsgetHints() {
// RequireHeaderValidationPolicy only requires access to the record headers.
// 0 is returned for the key/value portions of the record as this policy
// doesn't inspect these fields. This avoids any potential overhead in the
// broker parsing the record to find fields that the policy doesn't make use of.
returnnewRecordIntrospectionHints() {
  @Override
  publicbooleanaccessHeaders() {
   returntrue;
  }

  @Override
  publiclongaccessKeyBytes() {
   return0;
  }

  @Override
  publiclongaccessValueBytes() {
   return0;
  }
};
}

}