Status
Current state: Under Discussion
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Headers has been introduced in almost all Kafka components (broker, producer API, consumer API, connect API). This KIP is aimed to add Record Headers support as part of Streams Processor API first, to then discuss about how to approach its support on the DSL API level.
Headers can be used on different scenarios (e.g. propagating Tracing context between different components, operational information that can be used for filtering, etc.).
Public Interfaces
Add headers as part of processing:
org.apache.kafka.streams.processor.ProcessorContext:
Headers headers();
Proposed Changes
Adding `headers()` to `ProcessorContext` will enable custom processors and future DSL processors to have Headers available.
Internally, some components need to have headers available on the ProcessorContext, like:
- o.a.k.s.p.i.AbstractProcessorContext
- o.a.k.s.p.i.GlobalStateUpdateTask
- o.a.k.s.p.i.ProcessorRecordContext
- o.a.k.s.p.i.RecordCollector
- o.a.k.s.p.i.RecordCollectorImpl
- o.a.k.s.p.i.RecordContext
- o.a.k.s.p.i.RecordDeserialized
- o.a.k.s.p.i.SinkNode
- o.a.k.s.p.i.StampedRecord
More details on PR: https://github.com/apache/kafka/pull/4955
Compatibility, Deprecation, and Migration Plan
- Clients using High-Level DSL should not be affected with changes proposed.
Rejected Alternatives
1. Adding Headers to KTable API will mean propagate Headers to Stores that are Key Value specific like RocksDB. If headers are required in stateful operations, clients will need to map headers values first to key or value and then do processing.
2. Adding Headers to DSL API.
Future Work
- Adding DSL Processors to use Headers to filter/map/branch.