Status
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: KAFKA-7273
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka message headers is a simple and straightforward way to enrich Kafka messages with additional metadata. Metadata can be anything - user information, tracing, enriched fields, etc.
When using message schemas, Kafka message headers is the best way to transfer the information about a schema, for example schema ID, because in this case consumer of data doesn’t need to deserialize the whole payload to get the schema information using some sort of an envelope. The absence of an envelope or a custom protocol format makes it easy to transfer “raw” payload as is, for example serialized JSON, Avro or Protocol Buffers message. Even if message schemas are not used, something like content type or compression algorithm can be valid header metadata that helps serialization / deserialization.
Kafka Serializer and Deserializer interfaces provide a way to leverage header information in this way. However, Kafka Connect Converter interface does not. It makes it impossible to use Kafka Connect together with Kafka Producers, Consumers or Streams that rely on headers for serialization / deserialization.
This KIP proposes to bring a very similar header support to Kafka Connect.
Public Interfaces
connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java:
default byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) { return fromConnectData(topic, schema, value); } default SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) { return toConnectData(topic, value); }
Proposed Changes
Converter interface change as shown above. WorkerSinkTask and WorkerSourceTask will now provide headers to the fromConnectData/toConnectData methods.
Compatibility, Deprecation, and Migration Plan
New interface methods are default methods, which means all existing implementations won’t need any changes.
Rejected Alternatives
None