Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Status

Current state: Adopted in release 2.0

Discussion thread: here

JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
maximumIssues20
jqlQuerykey = KAFKA-6395 or key = KAFKA-5632 or key = KAFKA-6850
serverId5aa69414-a9e9-3523-82ec-879b028fb15b

...

Add headers as part of processing:

...


org.apache.kafka.streams.processor.ProcessorContext:

...

  * Headers headers();

...


org.apache.kafka.streams.processor.MockProcessorContext:

...

  * setRecordMetadata(final String topic, final int partition, final long offset, final Headers headers, final long timestamp)   

...

  * ConsumerRecord<byte[], byte[]> create(final String topicName, final K key, final V value, final Headers headers, final long timestampMs)
  * ConsumerRecord<byte[], byte[]> create(final String topicName, final K key, final V value, final Headers headers)
  * ConsumerRecord<byte[], byte[]> create(final String topicName, final V value, final Headers headers, final long timestampMs)
  * ConsumerRecord<byte[], byte[]> create(final String topicName, final V value, final Headers headers)
  * ConsumerRecord<byte[], byte[]> create(final K key, final V value, final Headers headers, final long timestampMs)
  * ConsumerRecord<byte[], byte[]> create(final K key, final V value, final Headers headers)
  * ConsumerRecord<byte[], byte[]> create(final V value, final Headers headers, final long timestampMs)
  * ConsumerRecord<byte[], byte[]> create(final V value, final Headers headers)


org.apache.kafka.streams.test.OutputVerifier
  * void compareValueHeaders(final ProducerRecord<K, V> record, final V expectedValue, final Headers expectedHeaders)
  * void compareValueHeaders(final ProducerRecord<K, V> record, final ProducerRecord<K, V> expectedRecord)
  * void compareKeyValueHeaders(final ProducerRecord<K, V> record, final K expectedKey K, final V expectedValue, final Headers expectedHeaders)
  * void compareKeyValueHeaders(final ProducerRecord<K, V> record, final ProducerRecord<K, V> expectedRecord)
  * void compareKeyValueHeadersTimestamp(final ProducerRecord<K, V> record, final K expectedKey K, final V expectedValue, final Headers expectedHeaders, final long expectedTimestamp)
  * void compareKeyValueHeadersTimestamp(final ProducerRecord<K, V> record, final ProducerRecord<K, V> expectedRecord)


Proposed Changes

Adding `headers()` to `ProcessorContext` will enable custom processors and future DSL processors to have Headers available.

Internally, some components need to have headers available on the ProcessorContext, like:

 


* o.a.k.s.p.i.AbstractProcessorContext

...

* o.a.k.s.p.i.LRUCacheEntry

 

...


More details on PR: https://github.com/apache/kafka/pull/4955

As Cache Store has headers available, then Headers will be propagated downstream to following nodes.

Persistent Stores (e.g. RocksDB) only consider Key Value pairs, therefore Headers are not propagated on stateful operations.  

 

In the case of stateful applications, consider that headers are not stored in state-stores; therefore only the headers from the current record in process is available. 

Headers Inheritance

1. To make the inheritance implementation of headers consistent with what we had with other record context fields. I.e. pass through the record context in `context.forward()`. Note that within a processor node, users can already manipulate the Headers with the given APIs, so at the time of forwarding, the library can just copy what-ever is left / updated to the next processor node.

2. In the sink node, where a record is being sent to the Kafka topic, we should consider the following:

a. For sink topics, we will set the headers into the producer record.
b. For repartition topics, we will the headers into the producer record.
c. For changelog topics, we will drop the headers in the produce record since they will not be used in restoration and not stored in the state store either.

...

  • Adding DSL Processors to use Headers to filter/map/branch. Potentially supported by KIP-159.

 

...