Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

org.apache.kafka.streams.processor.ProcessorContext:

 

  * Headers headers();

 

 

Proposed Changes

Adding `headers()` to `ProcessorContext` will enable custom processors and future DSL processors to have Headers available.

Internally, some components need to have headers available on the ProcessorContext, like:

 

* o.a.k.s.p.i.AbstractProcessorContext
* o.a.k.s.p.i.GlobalStateUpdateTask
* o.a.k.s.p.i.ProcessorRecordContext
* o.a.k.s.p.i.RecordCollector
* o.a.k.s.p.i.RecordCollectorImpl
* o.a.k.s.p.i.RecordContext
* o.a.k.s.p.i.RecordDeserialized
* o.a.k.s.p.i.SinkNode
* o.a.k.s.p.i.StampedRecord
* o.a.k.s.p.i.LRUCacheEntry

 

 

More details on PR: https://github.com/apache/kafka/pull/4955

As Cache Store has headers available, then Headers will be propagated downstream to following nodes.

Persistent Stores (e.g. RocksDB) only consider Key Value pairs, therefore Headers are not propagated on stateful operations.  

...