Status

Current state: Under Discussion

Discussion thread: here

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Headers has been introduced in almost all Kafka components (broker, producer API, consumer API, connect API). This KIP is aimed to add Record Headers support as part of Streams Processor API first, to then discuss about how to approach its support on the DSL API level. 

Headers can be used on different scenarios (e.g. propagating Tracing context between different components, operational information that can be used for filtering, etc.). 

Public Interfaces

Add headers as part of processing:

 
org.apache.kafka.streams.processor.ProcessorContext:

Proposed Changes

Adding `headers()` to `ProcessorContext` will enable custom processors and future DSL processors to have Headers available.

Internally, some components need to have headers available on the ProcessorContext, like:

 

More details on PR: https://github.com/apache/kafka/pull/4955

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

1. Adding Headers to KTable API will mean propagate Headers to Stores that are Key Value specific like RocksDB. If headers are required in stateful operations, clients will need to map headers values first to key or value and then do processing.

2. Adding Headers to DSL API. 

Future Work