THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
This KIP is aimed to add Record Headers support as part of Streams API for routing, filtering and mapping. Stateful operations (tables, join) will be out-of-scope.
Public Interfaces
1. Add Headers to `KeyValue` class:
org.apache.kafka.streams.KeyValue.java:
...
public final K key;
...
public final V value;
...
headers
...
2. Add headers as part of processing:
org.apache.kafka.streams.processor.Processor:
...
<K, V> void send(final String topic, final K key, final V value, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer);
<K, V> void send(final String topic, final K key, final V value, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner);
<K, V> void send(final String topic, final K key, final V value, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final Headers headers);
<K, V> void send(final String topic, final K key, final V value, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Headers headers);
32. Add methods to use Headers as part of processing:
...
`KStream<K, V> filter(PredicateWithHeaders<? super K, ? super V> predicate);`
`KStream<K, V> filterNot(PredicateWithHeaders<? super K, ? super V> predicate);`
`<KR> KStream<KR, V> selectKey(KeyValueAndHeadersMapper<? super K, ? super V, ? extends KR> mapper);`
`<KR, VR> KStream<KR, VR> map(KeyValueAndHeadersMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);`
`<KR, VR> KStream<KR, VR> flatMap(final KeyValueAndHeadersMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);`
`void foreach(final ForeachActionWithHeaders<? super K, ? super V> action);`
`KStream<K, V> peek(final ForeachActionWithHeaders<? super K, ? super V> action);`
`KStream<K, V>[] branch(final PredicateWithHeaders<? super K, ? super V>... predicates);`
`<KR> KGroupedStream<KR, V> groupBy(final KeyValueAndHeadersMapper<? super K, ? super V, KR> selector);`
`<KR> KGroupedStream<KR, V> groupBy(final KeyValueAndHeadersMapper<? super K, ? super V, KR> selector, final Serialized<KR, V> serialized);`
4. Add Headers when storing data in Streams API:
org.apache.kafka.streams.state.KeyValueStore:
...
void put(K key, V value);
...
void put(K key, V value, Headers headers);
...
V putIfAbsent(K key, V value);
...
Proposed Changes
1. Enable Kafka Streams to filter, map and process Records using Headers. This is based on API change #3.
...