You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP is aimed to add Record Headers support as part of Streams API for routing, filtering and mapping. Stateful operations (tables, join) will be out-of-scope.

Public Interfaces

1. Add headers as part of processing:

org.apache.kafka.streams.processor.Processor:
  • void process(K key, V value, Headers headers);
org.apache.kafka.streams.processor.ProcessorContext:
  • <K, V> void forward(K key, V value);
  • <K, V> void forward(K key, V value, Headers headers);
  • <K, V> void forward(K key, V value, String childName);
  • <K, V> void forward(K key, V value, Headers headers, String childName); 
  • <K, V> void forward(K key, V value, int childIndex);
  • <K, V> void forward(K key, V value, Headers headers, int childIndex);
org.apache.kafka.streams.processor.internals.RecordContext:
  • Headers headers();
org.apache.kafka.streams.processor.internals.RecordCollector:
  • <K, V> void send(final String topic, final K key, final V value, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer);
  • <K, V> void send(final String topic, final K key, final V value, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner); 
  • <K, V> void send(final String topic, final K key, final V value, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final Headers headers); 
  • <K, V> void send(final String topic, final K key, final V value, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Headers headers);

2. Add methods to use Headers as part of processing:

org.apache.kafka.streams.kstream.KStream:
  • `KStream<K, V> filter(PredicateWithHeaders<? super K, ? super V> predicate);`
  • `KStream<K, V> filterNot(PredicateWithHeaders<? super K, ? super V> predicate);`
  • `<KR, VR> KStream<KR, VR> map(KeyValueAndHeadersMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);`
  • `<KR, VR> KStream<KR, VR> flatMap(final KeyValueAndHeadersMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);`
  • `void foreach(final ForeachActionWithHeaders<? super K, ? super V> action);`
  • `KStream<K, V> peek(final ForeachActionWithHeaders<? super K, ? super V> action);`
  • `KStream<K, V>[] branch(final PredicateWithHeaders<? super K, ? super V>... predicates);`

Proposed Changes

1. Enable Kafka Streams to filter, map and process Records using Headers. This is based on API change #3. 

It will require the following functional interfaces:

  • PredicateWithHeaders
  • KeyValueAndHeadersMapper
  • ForeachActionWithHeaders

Then, clients will be able to use headers from Lambda expressions or in their anonymous classes:

```java

  stream.filter(new PredicateWithHeaders<String, WikiFeed>() {
  @Override
      public boolean test(final String dummy, final WikiFeed value, final Headers headers) {
          ...
      }
   })

```

And `RecordContext` will have headers as member to wrap it and pass it to internals.

 

Compatibility, Deprecation, and Migration Plan

  • Clients using High-Level DSL should not be affected with changes proposed.
  • Clients using Processor API will need to implement `void process(K key, V value, Headers headers);` to by-pass or handle Headers. 

Rejected Alternatives

1. Adding Headers to KTable API will mean propagate Headers to Stores that are Key Value specific like RocksDB. If headers are required in stateful operations, clients will need to map headers values first to key or value and then do processing.

 

References

Draft/WIP: https://github.com/jeqo/kafka/tree/streams-headers

Changes: https://github.com/apache/kafka/compare/trunk...jeqo:streams-headers

  • No labels