You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion

Discussion thread

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The broker does validation of the records from Kafka's message format perspective, but there is no way to validate them from an application perspective. This makes it very hard to guarantee any data-quality on a Kafka topic, and the consumers are left to fend for themselves by using mitigation strategies like a dead-letter-queue or block/crash on a malformed data.

One appealing use case for more validation could be to ensure that a topic ingests records only of a certain (or an allowed list of) schema(s). That is, a way to enforce Topic T to only append records belonging to Schema S, making Kafka schema aware.
An interface to allow custom validation logic to validate records before they are appended to local log could enable the above scenario and many more.
A chain of validations could be used to perform multiple validations in series.

Public Interfaces

The broker configuration would have a new config record.validator.classes which would take a comma separated list of implementation classes of the BrokerRecordValidator interface.
By default, the value would be an empty string and that means that there is no extra validation configured.

Configuration NameValid ValuesDefault Value
record.validator.classesclass name of BrokerRecordValidator implementationempty string


The BrokerRecordValidator interface is provided below:

interface BrokerRecordValidator {
    /**
     * Validate the record for a given topic-partition.
	 */
	Optional<ApiRecordError> validateRecord(Record record, TopicPartition topicPartition);
}


Proposed Changes

The chain of validations would be called in the LogValidator.scala class' validateRecord(), right after the calls to validateKey() and validateTimestamp().
The return type Optional<ApiRecordError> is the same as the other internal validate functions are returning as of today.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

An alternative to this would be to have the producers validate the records before sending, but guaranteeing that is difficult as producers could have bugs.

  • No labels