Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Headers has have been introduced in almost all Kafka components (broker, producer API, consumer API, connect API). This KIP is aimed to add Record Headers support as part of Streams Processor API first, to then discuss about how to approach its support on the DSL API level. 

Headers can be used on different scenarios (e.g. propagating Tracing context between different components, operational information that can be used for filtering, etc.). 

Headers must be propagated downstream to make them available on Sinks, and available on Processors to be able to manipulate them (e.g. in the cases of Distributed Tracing, it will be used to create Spans and injecting context for following Nodes), using Headers API

Public Interfaces

Add headers as part of processing:

...

org.apache.kafka.streams.processor.ProcessorContext:

 

  * Headers headers();

 

 

Proposed Changes

Adding `headers()` to `ProcessorContext` will enable custom processors and future DSL processors to have Headers available.

Internally, some components need to have headers available on the ProcessorContext, like:

 

* o.a.k.s.p.i.AbstractProcessorContext

 

* o.a.k.s.p.i.GlobalStateUpdateTask

* o.a.k.s.p.i.ProcessorRecordContext

* o.a.k.s.p.i.RecordCollector

* o.a.k.s.p.i.RecordCollectorImpl

* o.a.k.s.p.i.RecordContext

* o.a.k.s.p.i.RecordDeserialized

* o.a.k.s.p.i.SinkNode

 

* o.a.k.s.p.i.StampedRecord
* o.a.k.s.p.i.LRUCacheEntry

 

 

More details on PR: https://github.com/apache/kafka/pull/4955

As Cache Store has headers available, then Headers will be propagated downstream to following nodes.

Persistent Stores (e.g. RocksDB) only consider Key Value pairs, therefore Headers are not propagated on stateful operations.  

Compatibility, Deprecation, and Migration Plan

  • Clients using High-Level DSL and Processor API should not be affected with changes proposed. 
  • As Headers are propagated downstream, Clients that have Sources with Records that have Headers, will end up with Headers on Sink Topics. 

Rejected Alternatives

1. Adding Headers to KTable API will mean propagate Headers to Stores that are Key Value specific like RocksDB. If headers are required in stateful operations, clients will need to map headers values first to key or value and then do processing.

...

  • Adding DSL Processors to use Headers to filter/map/branch. Potentially supported by KIP-159.