Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP is aimed to add Record Headers support as part of Streams API for routing, filtering and mapping. Stateful operations (tables, join) will be out-of-scope.

Public Interfaces

1. Add Headers to `KeyValue` class: 

...

org.apache.kafka.streams.state.KeyValueStore:
  • void put(K key, V value);
  • void put(K key, V value, Headers headers);
  • V putIfAbsent(K key, V value);
  • V putIfAbsent(K key, V value, Headers headers);

Proposed Changes

1. Enable Kafka Streams to filter, map and process Records using Headers. This is based on API change #3. 

...

2. Internal mechanisms to process and store data will be affected by changes on interfaces (e.g. ProcessorContext, RecordCollector, RecordContext, KeyValueStore). This should only affect In Memory and Cache storage, persistent storage like RocksDB should pass-though headers.

Compatibility, Deprecation, and Migration Plan

  • Clients using High-Level DSL should not be affected with changes proposed.
  • Clients using Processor API will need to implement `void process(K key, V value, Headers headers);` to by-pass or handle Headers. 

Rejected Alternatives

1. Adding Headers to KTable API will mean propagate Headers to Stores that are Key Value specific like RocksDB. If headers are required in stateful operations, clients will need to map headers values first to key or value and then do processing.

 

References

Draft/WIP: https://github.com/jeqo/kafka/tree/streams-headers

...