Status
Current state: Under Discussion
Discussion thread:here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Record metadata values (e.g. topic name, partition, offset, timestamp) are accessible at the Processor API, including headers (KIP-244).
Using these values for common stateless tasks like:
filtering/branching based on topic name,
logging incoming partition number/offset number,
adding/modifying headers,
is not straightforward as it involves mixing Processors and DSL operators.
But also, using these values in more complex computations already available in DSL as joins/aggregations is more complex, as it will require reimplementing joins/aggregations with custom Processor. More info: https://issues.apache.org/jira/browse/KAFKA-7718
This KIP proposes to include operators to make record values accessible at the DSL level, allow using these values in stateful operations, and modifying underlying record headers.
Public Interfaces
In accordance with KStreams DSL Grammar, we introduce the following new elements to the KStream API:
`KStream.mapRecordValue` DSLOperation
`KStream.setRecordHeaders` DSLOperation with the following operand
`RecordHeadersMapper` DSLObject
Apart from these changes, new Public interfaces are added:
`o.a.k.s.kstream.RecordValue<V>` including the following attributes:
String topic
int partition
long offset
V value
long timestamp
o.a.k.s.header.Headers headers
`o.a.k.s.kstream.RecordValueSerde<V>` with the following binary structure:
varint valueSize
bytes value
varint topicNameSize
bytes topicName
varint partition
varlong offset
varlong timestamp
varint numHeaders
Header[]:
varint headerKeySize
bytes headerKey
varint headerValueSize
bytes headerValue
`o.a.k.s.header.Headers` (inspired on Connect Headers) implemented by a class `o.a.k.s.header.StreamHeaders`
int size()
boolean isEmpty()
Iterator<Header> allWithName(String key)
Header lastWithName(String key)
boolean hasWithName(String key)
Headers add(Header header)
Headers add(String key, byte[] value)
Headers addUtf8(String key, String value)
Headers remove(String key)
Headers retainLatest(String key)
Headers retainLatest()
Headers clear()
Headers duplicate()
Headers apply(Headers.HeaderTransform transform)
Headers apply(String key, Headers.HeaderTransform transform)
functional interface HeaderTransform { Header apply(Header header); }
o.a.k.common.header.Headers unwrap()
`o.a.k.s.header.Header` implemented by a class `o.a.k.s.header.StreamHeader`
String key
byte[] value
Description
1. `KStream#mapRecordValue(Named)` operation elevates the record metadata into a new value container: `RecordValue<V>`. The overloaded parameterless alternative mapRecordValue() is also available.
2. `RecordValue<V>` can be used in stateful operations with the serde `RecordValueSerde<V>`.
3. `KStream#setRecordHeaders(RecordHeaderMapper, Named)` operation will “flush” headers into Record Header crossing the stream, to be used by consumers downstream. This mapper function receives K key and V value, and return a `o.a.k.s.header.Headers`. Users can create new Headers using the streams implementation `o.a.k.s.header.StreamHeaders`, or using existing ones by previously using `KStreams#mapRecordValue()`. The overloaded parameterless alternative `setRecordHeaders(RecordHeaderMapper)` is also available.
Usage examples
1. Filter based on Kafka topic name:
builder .stream(List.of("input","another"),Consumed.with(Serdes.String(),Serdes.String())) .mapRecordValue() // 1. map record metadata .split()// 2. branch by topic name .branch((key, value)-> value.topic().equals("input"), Branched.withConsumer(b1 ->{//...})) .noDefaultBranch();
2. Filter based on header existance and header value:
b1 .mapRecordValue() //... .filter((key, value) -> value.headers().hasWithName("k")) .filter((key, value) -> "v".equals(value.headers().lastWithName("k").valueAsUtf8()))
3. Apply headers to Record:
b1 .mapRecordValue() //... .setRecordHeaders((k, v) -> v.headers().addUtf8("k1", "v1").retainLatest())
4. Use Headers in stateful:
b1 .mapRecordValue() //... .groupByKey() .reduce((value1, value2) -> { value1.headers().forEach(header -> value2.headers().add(header)); return value2; }, Materialized.with(Serdes.String(), new RecordValueSerde<>(Serdes.String())));
Proposed Changes
1. Add the following methods to KStream:
KStream<K, RecordValue<V>> mapRecordValue(final Named named); KStream<K, RecordValue<V>> mapRecordValue(); KStream<K, V> setRecordHeaders(final RecordHeadersMapper<? super K, ? super V> action, final Named named); KStream<K, V> setRecordHeaders(final RecordHeadersMapper<? super K, ? super V> action);
2. Add the following DSL Object class:
package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.header.Headers; public interface RecordHeadersMapper<K, V> { Headers get(final K key, final V value); }
3. Add the following Header interfaces with its implmentation (`StreamHeader(s)`):
package org.apache.kafka.streams.header; public interface Header { /** * The header's key, which is not necessarily unique within the set of headers on a Kafka message. * * @return the header's key; never null */ String key(); byte[] value(); String valueAsUtf8(); }
package org.apache.kafka.streams.header; import java.util.Iterator; public interface Headers extends Iterable<Header> { int size(); boolean isEmpty(); Iterator<Header> allWithName(String key); Header lastWithName(String key); boolean hasWithName(String key); Headers add(Header header); Headers add(String key, byte[] value); Headers addUtf8(String key, String value); Headers remove(String key); Headers retainLatest(String key); Headers retainLatest(); Headers clear(); Headers duplicate(); Headers apply(Headers.HeaderTransform transform); Headers apply(String key, Headers.HeaderTransform transform); interface HeaderTransform { Header apply(Header header); } org.apache.kafka.common.header.Headers unwrap(); }
4. Add `RecordValue<V>`:
package org.apache.kafka.streams.kstream; import java.util.Objects; import org.apache.kafka.streams.header.Headers; import org.apache.kafka.streams.header.StreamHeaders; public class RecordValue<V> { final String topic; final int partition; final long offset; final V value; final long timestamp; final Headers headers; //... }
5. And its serde:
public class RecordValueSerde<V> implements Serde<RecordValue<V>> { final Serde<V> valueSerde; public RecordValueSerde(Serde<V> valueSerde) { this.valueSerde = valueSerde; } @Override public Serializer<RecordValue<V>> serializer() { return new RecordValueSerializer<>(valueSerde.serializer()); } @Override public Deserializer<RecordValue<V>> deserializer() { return new RecordValueDeserializer<>(valueSerde.deserializer()); } //... }
6. Extend `To` class with `o.a.k.common.headers.Headers`:
public class To { protected String childName; protected String childName; protected long timestamp; protected long timestamp; protected Headers headers; //... public To withHeaders(Headers headers) { this.headers = headers; return this; } //... }
Draft implementation: https://github.com/apache/kafka/pull/10265
Compatibility, Deprecation, and Migration Plan
If users have an existing stateful operator and add mapRecordValue before this operator, will change the Value from V to RecordValue<V> , causing a backward-incompatibile topology change.
To
API will be extended to support headers and be backwards compatible.
KStreamSetRecordHeaders
and KStreamMapRecordValue
are both using the latest Processor
API from KIP-478.
Rejected Alternatives
Expand `KeyValue` to support headers. This will affect all current APIs, from KStream/KTable to Stores.
Adding `mergeHeaders` functions to join/aggregation. Although this will extend support for headers, will add complexity to existing functions.
(initial version of this KIP) Add Header-specific methods to the DSL (e.g. `withHeaders`, `addHeader`, `removeHeaders`). Although this will allow accessing and manipulating headers from DSL, it will have a high impact on the existing KStream API (too many methods) and only specific for Headers. Also, it will require dealing with the same abstraction as Kafka Records. Will require more methods to cover other metadata.