Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Under Discussion Adopted

Discussion thread: [DISCUSS] KIP-440: Extend Connect Converter to support headers

JIRA: KAFKA-7273

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

This KIP proposes to bring a very similar header support to Kafka Connect.

Public Interfaces

Two new default methods will be added to the existing converter interface (connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java):

Code Block
languagejava
import org.apache.kafka.common.header.Headers;

default byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) {
    return fromConnectData(topic, schema, value);
}

default SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
    return toConnectData(topic, value);
}

Proposed Changes

Converter interface will have two new default methods described in the Public Interfaces section (above).

WorkerSinkTask will use the new toConnectData method and pass "raw" Kafka message headers. Headers converter is not required in this case.

WorkerSourceTask will use the new fromConnectData method and pass the headers received from the headers converter, RecordHeaders (which implements Headers interface). Headers converter is used as a way to get headers when converting data from internal Connect format to Kafka messages.

Both WorkerSinkTask and WorkerSourceTask apply these methods for message keys and message values change as shown above. WorkerSinkTask and WorkerSourceTask will now provide headers to the fromConnectData/toConnectData methods.

Compatibility, Deprecation, and Migration Plan

New interface methods are default methods, which means all existing implementations won’t need any changes.  

Rejected Alternatives

Headers converter interface has been introduced in KIP-145 and it provides a mechanism to serialize and deserialize header values.

Unfortunately, using headers converter alone to solve the problem defined in the Motivation section (above) is not enough, headers converter is only used for headers transformation. It doesn't provide a way to apply this kind of transformation to message keys and values.None