Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP is aimed to add Record Headers support as part of Streams API for routing, filtering and mapping. Stateful operations (tables, join) will be out-of-scope.

Public Interfaces

1. Add Headers to `KeyValue` class: 

org.apache.kafka.streams.KeyValue.java:

...

public final K key;

...

public final V value;

...

headers

...

 

2. Add headers as part of processing:

org.apache.kafka.streams.processor.Processor:

...

  • <K, V> void send(final String topic, final K key, final V value, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer);
  • <K, V> void send(final String topic, final K key, final V value, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner); 
  • <K, V> void send(final String topic, final K key, final V value, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final Headers headers); 
  • <K, V> void send(final String topic, final K key, final V value, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Headers headers);

32. Add methods to use Headers as part of processing:

...

  • `KStream<K, V> filter(PredicateWithHeaders<? super K, ? super V> predicate);`
  • `KStream<K, V> filterNot(PredicateWithHeaders<? super K, ? super V> predicate);`
  • `<KR> KStream<KR, V> selectKey(KeyValueAndHeadersMapper<? super K, ? super V, ? extends KR> mapper);`
  • `<KR, VR> KStream<KR, VR> map(KeyValueAndHeadersMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);`
  • `<KR, VR> KStream<KR, VR> flatMap(final KeyValueAndHeadersMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);`
  • `void foreach(final ForeachActionWithHeaders<? super K, ? super V> action);`
  • `KStream<K, V> peek(final ForeachActionWithHeaders<? super K, ? super V> action);`
  • `KStream<K, V>[] branch(final PredicateWithHeaders<? super K, ? super V>... predicates);`
  • `<KR> KGroupedStream<KR, V> groupBy(final KeyValueAndHeadersMapper<? super K, ? super V, KR> selector);`
  • `<KR> KGroupedStream<KR, V> groupBy(final KeyValueAndHeadersMapper<? super K, ? super V, KR> selector, final Serialized<KR, V> serialized);`
 

4. Add Headers when storing data in Streams API:

org.apache.kafka.streams.state.KeyValueStore:

...

void put(K key, V value);

...

void put(K key, V value, Headers headers);

...

V putIfAbsent(K key, V value);

...

Proposed Changes

1. Enable Kafka Streams to filter, map and process Records using Headers. This is based on API change #3. 

...