Table of Contents |
---|
Status
Current state: Under Discussion Adopted
Discussion thread: [DISCUSS] KIP-440: Extend Connect Converter to support headers
JIRA: KAFKA-7273
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
This KIP proposes to bring a very similar header support to Kafka Connect.
Public Interfaces
Two new default methods will be added to the existing converter interface (connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java):
Code Block | ||
---|---|---|
| ||
import org.apache.kafka.common.header.Headers;
default byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) {
return fromConnectData(topic, schema, value);
}
default SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
return toConnectData(topic, value);
} |
Proposed Changes
Converter interface will have two new default methods described in the Public Interfaces section (above).
WorkerSinkTask will use the new toConnectData method and pass "raw" Kafka message headers. Headers converter is not required in this case.
WorkerSourceTask will use the new fromConnectData method and pass the headers received from the headers converter, RecordHeaders (which implements Headers interface). Headers converter is used as a way to get headers when converting data from internal Connect format to Kafka messages.
Both WorkerSinkTask and WorkerSourceTask apply these methods for message keys and message values change as shown above. WorkerSinkTask and WorkerSourceTask will now provide headers to the fromConnectData/toConnectData methods.
Compatibility, Deprecation, and Migration Plan
New interface methods are default methods, which means all existing implementations won’t need any changes.
Rejected Alternatives
Headers converter interface has been introduced in KIP-145 and it provides a mechanism to serialize and deserialize header values.
Unfortunately, using headers converter alone to solve the problem defined in the Motivation section (above) is not enough, headers converter is only used for headers transformation. It doesn't provide a way to apply this kind of transformation to message keys and values.None