Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Add headers as part of processing:

 

org.apache.kafka.streams.processor.ProcessorContext:
  * Headers headers();

...


org.apache.kafka.streams.processor.MockProcessorContext:

...

Internally, some components need to have headers available on the ProcessorContext, like:

 


* o.a.k.s.p.i.AbstractProcessorContext

...

* o.a.k.s.p.i.LRUCacheEntry

 

 


More details on PR: https://github.com/apache/kafka/pull/4955

As Cache Store has headers available, then Headers will be propagated downstream to following nodes.

Persistent Stores (e.g. RocksDB) only consider Key Value pairs, therefore Headers are not propagated on stateful operations.  

 

In the case of stateful applications, consider that headers are not stored in state-stores; therefore only the headers from the current record in process is available. 

Headers Inheritance

1. To make the inheritance implementation of headers consistent with what we had with other record context fields. I.e. pass through the record context in `context.forward()`. Note that within a processor node, users can already manipulate the Headers with the given APIs, so at the time of forwarding, the library can just copy what-ever is left / updated to the next processor node.

2. In the sink node, where a record is being sent to the Kafka topic, we should consider the following:

a. For sink topics, we will set the headers into the producer record.
b. For repartition topics, we will the headers into the producer record.
c. For changelog topics, we will drop the headers in the produce record since they will not be used in restoration and not stored in the state store either.

...

  • Adding DSL Processors to use Headers to filter/map/branch. Potentially supported by KIP-159.

 

...