Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Headers are transiently passed over a Kafka Streams topology. To act on them, Processor API has to be used since Record metadata values (e.g. topic name, partition, offset, timestamp) are accessible at the Processor API, including headers (KIP-244).

Although current support is useful for instrumentations that need to access headers, it becomes cumbersome for users to access headers on common Kafka Streams DSL operations (e.g filtering based on header value) as requires using a Transformer/Processor implementation.

Public Interfaces

  1. Include a new type, to map value and headers.
  2. Include ValueAndHeaders serde to serialize values if needed.
  3. Include KStreams operator to map headers into the value pair: ValueAndHeaders.
  4. Include KStreams operator to set and remove headers.

Using these values for common stateless tasks like:

  • filtering/branching based on topic name,

  • logging incoming partition number/offset number,

  • adding/modifying headers,

is not straightforward as it involves mixing Processors and DSL operators.

But also, using these values in more complex computations already available in DSL as joins/aggregations is more complex, as it will require reimplementing joins/aggregations with custom Processor. More info: https://issues.apache.org/jira/browse/KAFKA-7718

This KIP proposes to include operators to make record values accessible at the DSL level, allow using these values in stateful operations, and modifying underlying record headers.


Public Interfaces

In accordance with KStreams DSL Grammar, we introduce the following new elements to the KStream API:

  • KStream DSLObject with new operations:
    • setHeader DSLOperation
    • setHeaders DSLOperation
    • removeHeader DSLOperation
    • removeHeaders DSLOperation
    • removeAllHeaders DSLOperation
    • removeAllHeaders DSLOperation
    • withHeaders DSLOperation

Proposed Changes

Include a new type, to map value and headers, and include ValueAndHeaders serde to serialize values if needed.

Code Block
// New type
public class ValueAndHeaders <V> {
    private final V value;
    private final Headers headers;

    //...
}
// With Serde to persist/join if needed
public class ValueAndHeadersSerde<V> {
}

Include KStreams operator to map headers into the value pair: ValueAndHeaders, and include KStreams operator to set and remove headers.

  • `KStream.mapRecordValue` DSLOperation

  • `KStream.setRecordHeaders` DSLOperation with the following operand

    • `RecordHeadersMapper` DSLObject

Apart from these changes, new Public interfaces are added:

  • `o.a.k.s.kstream.RecordValue<V>` including the following attributes:

    • String topic

    • int partition

    • long offset

    • V value

    • long timestamp

    • o.a.k.s.header.Headers headers

  • `o.a.k.s.kstream.RecordValueSerde<V>` with the following binary structure:

    • varint valueSize

    • bytes value

    • varint topicNameSize

    • bytes topicName

    • varint partition

    • varlong offset

    • varlong timestamp

    • varint numHeaders

    • Header[]:

      • varint headerKeySize

      • bytes headerKey

      • varint headerValueSize

      • bytes headerValue

  • `o.a.k.s.header.Headers` (inspired on Connect Headers) implemented by a class `o.a.k.s.header.StreamHeaders`

    • int size()

    • boolean isEmpty()

    • Iterator<Header> allWithName(String key)

    • Header lastWithName(String key)

    • boolean hasWithName(String key)

    • Headers add(Header header)

    • Headers add(String key, byte[] value)

    • Headers addUtf8(String key, String value)

    • Headers remove(String key)

    • Headers retainLatest(String key)

    • Headers retainLatest()

    • Headers clear()

    • Headers duplicate()

    • Headers apply(Headers.HeaderTransform transform)

    • Headers apply(String key, Headers.HeaderTransform transform)

    • functional interface HeaderTransform { Header apply(Header header); }

    • o.a.k.common.header.Headers unwrap()

  • `o.a.k.s.header.Header` implemented by a class `o.a.k.s.header.StreamHeader`

    • String key

    • byte[] value

Description

1. `KStream#mapRecordValue(Named)` operation elevates the record metadata into a new value container: `RecordValue<V>`. The overloaded parameterless alternative mapRecordValue() is also available.

2. `RecordValue<V>` can be used in stateful operations with the serde `RecordValueSerde<V>`.

3. `KStream#setRecordHeaders(RecordHeaderMapper, Named)` operation will “flush” headers into Record Header crossing the stream, to be used by consumers downstream. This mapper function receives K key and V value, and return a `o.a.k.s.header.Headers`. Users can create new Headers using the streams implementation `o.a.k.s.header.StreamHeaders`, or using existing ones by previously using `KStreams#mapRecordValue()`. The overloaded parameterless alternative `setRecordHeaders(RecordHeaderMapper)` is also available.


Usage examples

1. Filter based on Kafka topic name:


Code Block
builder
  .stream(List.of("input","another"),Consumed.with(Serdes.String(),Serdes.String()))
  .mapRecordValue() // 1. map record metadata
  .split()// 2. branch by topic name
  .branch((key, value)-> value.topic().equals("input"), Branched.withConsumer(b1 ->{//...}))
  .noDefaultBranch();


2. Filter based on header existance and header value:

Code Block
b1
  .mapRecordValue() 
  //...
  .filter((key, value) -> value.headers().hasWithName("k"))
  .filter((key, value) -> "v".equals(value.headers().lastWithName("k").valueAsUtf8())) 


3. Apply headers to Record:

Code Block
b1
  .mapRecordValue() 
  //...
  .setRecordHeaders((k, v) -> v.headers().addUtf8("k1", "v1").retainLatest())  


4. Use Headers in stateful:

Code Block
b1
  .mapRecordValue()
  //... 
  .groupByKey()
  .reduce((value1, value2) -> { 
    value1.headers().forEach(header -> value2.headers().add(header)); 
    return value2;
  }, Materialized.with(Serdes.String(), new RecordValueSerde<>(Serdes.String()))); 


Proposed Changes

1. Add the following methods to KStream:


Code Block
    KStream<K, RecordValue<V>> mapRecordValue(final Named named);

    KStream<K, RecordValue<V>> mapRecordValue();
    
Code Block
public class KStream {
    //Functions to act on headers
    KStream<K, V> setHeaders(final SetHeadersAction<? super K, ? super V> action, final Named named);

    KStream<K, V> setHeaders(final SetHeadersAction<? super K, ? super V> action);

    KStream<K, V> setHeadersetRecordHeaders(final SetHeaderAction<RecordHeadersMapper<? super K, ? super V> action, final Named named);

    KStream<K, V> setHeadersetRecordHeaders(final SetHeaderAction<RecordHeadersMapper<? super K, ? super V> action);


2. Add the following DSL Object class:

Code Block
package org.apache.kafka.streams.kstream;

import org.apache.kafka.streams.header.Headers;

public interface  KStream<KRecordHeadersMapper<K, V> removeHeaders{

    Headers get(final Iterable<String>K headerKeyskey, final Named V value);
}


3. Add the following Header interfaces with its implmentation (`StreamHeader(s)`):

Code Block
package org.apache.kafka.streams.headernamed);

public interface Header {
 KStream<K, V> removeHeaders(final Iterable<String> headerKeys);

 /**
    KStream<K, V>* removeHeader(final String headerKey, final Named named);

    KStream<K, V> removeHeader(final String headerKey);

    KStream<K, V> removeAllHeaders(final Named named);
The header's key, which is not necessarily unique within the set of headers on a Kafka message.
     *
     * @return the header's key; never null
     */
    KStream<K, V> removeAllHeadersString key();

    KStream<K, ValueAndHeaders<V>> withHeaders(final Named namedbyte[] value();

    KStream<K, ValueAndHeaders<V>> withHeadersString valueAsUtf8();
    //...
}}


Code Block
package org.apache.kafka.streams.header;

import java.util.Iterator;

public interface SetHeadersAction<K, V>Headers  extends Iterable<Header> {

  int  Iterable<Header> apply(final K key, final V value);
}

public interface SetHeaderAction<K, V> {
    Header apply(final K key, final V value);
}

This new APIs will allow usages similar to:

size();
  boolean isEmpty();
  Iterator<Header> allWithName(String key);
  Header lastWithName(String key);
  boolean hasWithName(String key);
  Headers add(Header header);
  Headers add(String key, byte[] value);
  Headers addUtf8(String key, String value);
  Headers remove(String key);
  Headers retainLatest(String key);
  Headers retainLatest();
  Headers clear();
  Headers duplicate();
  Headers apply(Headers.HeaderTransform transform);
  Headers apply(String key, Headers.HeaderTransform transform);
  interface HeaderTransform {
    Header apply(Header header);
  }
  org.apache.kafka.common.header.Headers unwrap();
}


4. Add `RecordValue<V>`:

Code Block
package org.apache.kafka.streams.kstream;

import java.util.Objects;
import org.apache.kafka.streams.header.Headers;
import org.apache.kafka.streams.header.StreamHeaders;

public class RecordValue<V> {

    final String topic;
    final int partition;
    final long offset;
    final V value;
    final long timestamp;
    final Headers headers;
    //...
}


5. And its serde:

Code Block
public class RecordValueSerde<V> implements Serde<RecordValue<V>> {

    final Serde<V> valueSerde;

    public RecordValueSerde(Serde<V> valueSerde) 
Code Block
kstream.withHeaders() // headers mapped to value
       .filter((k, v) -> v.headers().headers("k").iterator().hasNext())
       .filter((k, v) -> Arrays.equals(v.headers().lastHeader("k").value(), "v".getBytes())) // filtering based on header value
       .groupByKey(Grouped.with(Serdes.String(), new ValueAndHeadersSerde<>(Serdes.String()))) // val/headers serialization
       .reduce((oldValue, newValue) -> {
         newValue.headers().add("reduced", "yes".getBytes()); // user deciding how to merge headersthis.valueSerde = valueSerde;
    }

    @Override
    public Serializer<RecordValue<V>> serializer() {
        return new ValueAndHeaders<>RecordValueSerializer<>(oldValue.value().concat(newValue.valuevalueSerde.serializer()), newValue.headers());;
    }

    @Override
   }) public Deserializer<RecordValue<V>> deserializer() {
       .mapValues((k, v) -> {v.headers().add("foo", "bar".getBytes( return new RecordValueDeserializer<>(valueSerde.deserializer());
 return   v;})}
    // mutate headers
       .setHeader((k, v) -> new RecordHeader("newHeader", "val".getBytes())) // add more headers...
}


6. Extend `To` class with `o.a.k.common.headers.Headers`:

Code Block
public class To {
    protected String childName;
    protected String childName;
    protected long timestamp;
    protected long timestamp;
    protected Headers headers;
    
   .mapValues((k, v) -> v.value()) // return to value
       .to("output") //...
    public To withHeaders(Headers headers) {
        this.headers = headers;
        return this;
    }
    //...
}


Compatibility, Deprecation, and Migration Plan

  • New functions will be supported since 2.0+, as KIP-244 adopted.
  • No existing stores or functions are affected.

Potential next steps

...

If users have an existing stateful operator and add mapRecordValue before this operator, will change the Value from V to RecordValue<V> , causing a backward-incompatibile topology change.

Rejected Alternatives

  1. Expand

...

  1. `KeyValue` to support headers. This will affect all current APIs, from KStream/KTable to Stores.

  2. Adding

...

  1. `mergeHeaders` functions to join/aggregation. Although this will extend support for headers, will add complexity to existing functions.

  2. (initial version of this KIP) Add Header-specific methods to the DSL (e.g. `withHeaders`, `addHeader`, `removeHeaders`). Although this will allow accessing and manipulating headers from DSL, it will have a high impact on the existing KStream API (too many methods) and only specific for Headers. Also, it will require dealing with the same abstraction as Kafka Records. Will require more methods to cover other metadata.