Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Headers has been introduced in almost all Kafka components (broker, producer API, consumer API, connect API). This KIP is aimed to add Record Headers support as part of Streams Processor API for routing, filtering and mapping. Stateful operations (tables, join) will be out-of-scope.first, to then discuss about how to approach its support on the DSL API level. 

Headers can be used on different scenarios (e.g. propagating Tracing context between different components, operational information that can be used for filtering, etc.). 

Public Interfaces

1. Add headers as part of processing:

org.apache.kafka.streams.processor.Processor:

...

 
org.apache.kafka.streams.processor.ProcessorContext:
  • <K, V> void forward(K key, V value);
  • <K, V> void forward(K key, V value, Headers headers);<K, V> void forward(K key, V value, String childName);
  • <K, V> void forward(K key, V value, Headers headers, String childName); 
  • <K, V> void forward(K key, V value, int childIndex);
  • <K, V> void forward(K key, V value, Headers headers, int childIndex);
org.apache.kafka.streams.processor.internals.RecordContext:
  • Headers headers();
org.apache.kafka.streams.processor.internals.RecordCollector:
  • <K, V> void send(final String topic, final K key, final V value, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer);
  • <K, V> void send(final String topic, final K key, final V value, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner); 
  • <K, V> void send(final String topic, final K key, final V value, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final Headers headers); 
  • <K, V> void send(final String topic, final K key, final V value, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Headers headers);

2. Add methods to use Headers as part of processing:

org.apache.kafka.streams.kstream.KStream:
  • `KStream<K, V> filter(PredicateWithHeaders<? super K, ? super V> predicate);`
  • `KStream<K, V> filterNot(PredicateWithHeaders<? super K, ? super V> predicate);`
  • `<KR, VR> KStream<KR, VR> map(KeyValueAndHeadersMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);`
  • `<KR, VR> KStream<KR, VR> flatMap(final KeyValueAndHeadersMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);`
  • `void foreach(final ForeachActionWithHeaders<? super K, ? super V> action);`
  • `KStream<K, V> peek(final ForeachActionWithHeaders<? super K, ? super V> action);`
  • `KStream<K, V>[] branch(final PredicateWithHeaders<? super K, ? super V>... predicates);`

Proposed Changes

1. Enable Kafka Streams to filter, map and process Records using Headers. This is based on API change #3. 

It will require the following functional interfaces:

  • PredicateWithHeaders
  • KeyValueAndHeadersMapper
  • ForeachActionWithHeaders

Then, clients will be able to use headers from Lambda expressions or in their anonymous classes:

```java

  stream.filter(new PredicateWithHeaders<String, WikiFeed>() {
  @Override
      public boolean test(final String dummy, final WikiFeed value, final Headers headers) {
          ...
      }
   })

```

And `RecordContext` will have headers as member to wrap it and pass it to internals.

  •  

Proposed Changes

Adding `headers()` to `ProcessorContext` will enable custom processors and future DSL processors to have Headers available.

Internally, some components need to have headers available on the ProcessorContext, like:

  • o.a.k.s.p.i.AbstractProcessorContext
  • o.a.k.s.p.i.GlobalStateUpdateTask
  • o.a.k.s.p.i.ProcessorRecordContext
  • o.a.k.s.p.i.RecordCollector
  • o.a.k.s.p.i.RecordCollectorImpl
  • o.a.k.s.p.i.RecordContext
  • o.a.k.s.p.i.RecordDeserialized
  • o.a.k.s.p.i.SinkNode
  • o.a.k.s.p.i.StampedRecord

 

More details on PR: https://github.com/apache/kafka/pull/4955

...

Compatibility, Deprecation, and Migration Plan

  • Clients using High-Level DSL should not be affected with changes proposed.
  • Clients using Processor API will need to implement `void process(K key, V value, Headers headers);` to by-pass or handle Headers. 

Rejected Alternatives

1. Adding Headers to KTable API will mean propagate Headers to Stores that are Key Value specific like RocksDB. If headers are required in stateful operations, clients will need to map headers values first to key or value and then do processing.

2. Adding Headers to DSL API. 

References

Draft/WIP: https://github.com/jeqo/kafka/tree/streams-headers

Future Work

  • Adding DSL Processors to use Headers to filter/map/branch.

 

 Changes: https://github.com/apache/kafka/compare/trunk...jeqo:streams-headers