Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In accordance with KStreams DSL Grammar, we introduce the following new elements to the KStream API:

  • `KStreamKStream.mapRecordValue` mapValueToRecord  DSLOperation

  • `KStreamKStream.setRecordHeaders` setRecordHeaders  DSLOperation with the following operand

    • `RecordHeadersMapper` RecordHeadersMapper  DSLObject

Apart from these changes, new Public interfaces are added:

  • `oo.a.k.s.kstream.RecordValue<V>` RecordValue<V>  including the following attributes:

    • String topic

    • int partition

    • long offset

    • V value

    • long timestamp

    • o.a.k.s.header.Headers headers

  • `oo.a.k.s.kstream.RecordValueSerde<V>` RecordValueSerde<V>  with the following binary structure:

    • varint valueSize

    • bytes value

    • varint topicNameSize

    • bytes topicName

    • varint partition

    • varlong offset

    • varlong timestamp

    • varint numHeaders

    • Header[]:

      • varint headerKeySize

      • bytes headerKey

      • varint headerValueSize

      • bytes headerValue

  • `oo.a.k.s.header.Headers` Headers  (inspired on Connect Headers) implemented by a class `o.a.k.s.header.StreamHeaders`

    • int size()

    • boolean isEmpty()

    • Iterator<Header> allWithName(String key)

    • Header lastWithName(String key)

    • boolean hasWithName(String key)

    • Headers add(Header header)

    • Headers add(String key, byte[] value)

    • Headers addUtf8(String key, String value)

    • Headers remove(String key)

    • Headers retainLatest(String key)

    • Headers retainLatest()

    • Headers clear()

    • Headers duplicate()

    • Headers apply(Headers.HeaderTransform transform)

    • Headers apply(String key, Headers.HeaderTransform transform)

    • functional interface HeaderTransform { Header apply(Header header); }

    • o.a.k.common.header.Headers unwrap()

  • `oo.a.k.s.header.Header` Header  implemented by a class `o.a.k.s.header.StreamHeader`

    • String key

    • byte[] value

Description

1. `KStream#mapRecordValueKStream#mapValueToRecord(Named)`   operation elevates the record metadata into a new value container: `RecordValue<V>`. The overloaded parameterless alternative mapRecordValue() is also available.

2. `RecordValue<V>` RecordValue<V>  can be used in stateful operations with the serde `RecordValueSerde<V>` RecordValueSerde<V> .

3. RecordValueSerde  is a public API, and it's implicitly defined as valueSerde  when te mapValueToRecord  is called.

4. KStream#setRecordHeaders. `KStream#setRecordHeaders(RecordHeaderMapper, Named)`   operation will “flush” headers into Record Header crossing the stream, to be used by consumers downstream. This mapper function receives K key and V value, and return a `oo.a.k.s.header.Headers`Headers . Users can create new Headers using the streams implementation `oo.a.k.s.header.StreamHeaders`StreamHeaders , or using existing ones by previously using `KStreams#mapRecordValueKStreams#mapRecordValue()` . The overloaded parameterless alternative `setRecordHeaderssetRecordHeaders(RecordHeaderMapper)`   is also available.

Usage examples

...

Code Block
builder
  .stream(List.of("input","another"),Consumed.with(Serdes.String(),Serdes.String()))
  .mapRecordValuemapValueToRecord() // 1. map record metadata
  .split()// 2. branch by topic name
  .branch((key, value)-> value.topic().equals("input"), Branched.withConsumer(b1 ->{//...}))
  .noDefaultBranch();

...

3. Apply headers to Record:

Code Block
b1
  .mapRecordValuemapValueToRecord() 
  //...
  .setRecordHeaders((k, v) -> v.headers().addUtf8("k1", "v1").retainLatest())  

...

To  API will be extended to support headers and be backwards compatible.

KStreamSetRecordHeaders  and

...

 KStreamMapValueToRecord  are both using the latest

...

 Processor  API from KIP-478.


Rejected Alternatives

  1. Expand `KeyValue` to support headers. This will affect all current APIs, from KStream/KTable to Stores.

  2. Adding `mergeHeaders` functions to join/aggregation. Although this will extend support for headers, will add complexity to existing functions.

  3. (initial version of this KIP) Add Header-specific methods to the DSL (e.g. `withHeaders`, `addHeader`, `removeHeaders`). Although this will allow accessing and manipulating headers from DSL, it will have a high impact on the existing KStream API (too many methods) and only specific for Headers. Also, it will require dealing with the same abstraction as Kafka Records. Will require more methods to cover other metadata.

...