You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: Under Discussion

Discussion thread: [DISCUSS] KIP-440: Extend Connect Converter to support headers

JIRA: KAFKA-7273

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka message headers is a simple and straightforward way to enrich Kafka messages with additional metadata. Metadata can be anything - user information, tracing, enriched fields, etc.

When using message schemas, Kafka message headers is the best way to transfer the information about a schema, for example schema ID, because in this case consumer of data doesn’t need to deserialize the whole payload to get the schema information using some sort of an envelope. The absence of an envelope or a custom protocol format makes it easy to transfer “raw” payload as is, for example serialized JSON, Avro or Protocol Buffers message. Even if message schemas are not used, something like content type or compression algorithm can be valid header metadata that helps serialization / deserialization.

Kafka Serializer and Deserializer interfaces provide a way to leverage header information in this way. However, Kafka Connect Converter interface does not. It makes it impossible to use Kafka Connect together with Kafka Producers, Consumers or Streams that rely on headers for serialization / deserialization.

This KIP proposes to bring a very similar header support to Kafka Connect.

Public Interfaces

connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java:

default byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) {
    return fromConnectData(topic, schema, value);
}

default SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
    return toConnectData(topic, value);
}

Proposed Changes

Converter interface change as shown above. WorkerSinkTask and WorkerSourceTask will now provide headers to the fromConnectData/toConnectData methods.

Compatibility, Deprecation, and Migration Plan

New interface methods are default methods, which means all existing implementations won’t need any changes.  

Rejected Alternatives

None

  • No labels