Status
Current state: Under Discussion
Discussion thread:
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The broker does validation of the records from Kafka's message format perspective, but there is no way to validate them from an application perspective. This makes it very hard to guarantee any data-quality on a Kafka topic, and the consumers are left to fend for themselves by using mitigation strategies like a dead-letter-queue or block/crash on a malformed data.
One appealing use case for more validation could be to ensure that a topic ingests records only of a certain (or an allowed list of) schema(s). That is, a way to enforce Topic T to only append records belonging to Schema S, making Kafka schema aware.
An interface to allow custom validation logic to validate records before they are appended to local log could enable the above scenario and many more.
A chain of validations could be used to perform multiple validations in series.
Public Interfaces
The broker configuration would have a new config record.validator.classes which would take a comma separated list of implementation classes of the BrokerRecordValidator interface.
By default, the value would be an empty string and that means that there is no extra validation configured.
Configuration Name | Valid Values | Default Value |
---|---|---|
record.validator.classes | class name of BrokerRecordValidator implementation | empty string |
The BrokerRecordValidator interface is provided below:
interface BrokerRecordValidator { /** * Validate the record for a given topic-partition. */ Optional<ApiRecordError> validateRecord(Record record, TopicPartition topicPartition); }
Proposed Changes
The chain of validations would be called in the LogValidator.scala class' validateRecord(), right after the calls to validateKey() and validateTimestamp().
The return type Optional<ApiRecordError> is the same as the other internal validate functions are returning as of today.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Rejected Alternatives
An alternative to this would be to have the producers validate the records before sending, but guaranteeing that is difficult as producers could have bugs.