Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The motivation here is to gain some grounds on data-quality in Kafka. The broker does validation of the records from Kafka's message format perspective, but there is no way to validate them from an application perspective. This makes it very hard to guarantee any data-quality on a Kafka topic, and the consumers are left to fend for themselves by using have to use mitigation strategies like either a dead-letter-queue or block/crash on a malformed data.

One appealing example use case for more validation could be to ensure illustration - Ensure that a topic ingests records only of a certain (or an allowed list of) schema(s). That is, a way to enforce Topic T to only append records belonging to Schema S, making Kafka topics schema aware (a bit like a traditional database).

An interface to allow custom validation logic to validate records before they are appended to local log could enable the above scenario and many more.
A chain of validations could be used to perform multiple validations in series.

...

The broker configuration would have a new config record.validator.classes which would take a comma separated list of implementation classes of the BrokerRecordValidator interface.
By default, the value would be an empty string and that means that there is no extra validation configured.

Configuration NameValid ValuesDefault Value
record.validator.classesclass name of BrokerRecordValidator implementationempty string


The BrokerRecordValidator interface is provided below:

...

The chain of validations would be called in the LogValidator.scala class' validateRecord(), right after the calls to validateKey() and validateTimestamp().
The return type Optional<ApiRecordError> is the same as the other internal validate functions are returning as of today.

Here is the proposed place in the existing code: https://github.com/apache/kafka/blob/744d05b12897267803f46549e8bca3d31d57be4c/core/src/main/scala/kafka/log/LogValidator.scala#L211

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

The introduction of the new config would be backward compatible. Not using it (default value) would let the broker not perform any extra validations on records.
Users who want to use this feature would need to provide an implementation of the proposed interface and add the new configuration to the broker.

Rejected Alternatives

An alternative to this would be to have the producers validate the records before sending them to the brokers, but guaranteeing that is difficult as producers could have bugs.