You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA:

key summary type created updated due assignee reporter priority status resolution

JQL and issue key arguments for this macro require at least one Jira application link to be configured


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Headers has been introduced in almost all Kafka components (broker, producer API, consumer API, connect API). This KIP is aimed to add Record Headers support as part of Streams Processor API first, to then discuss about how to approach its support on the DSL API level. 

Headers can be used on different scenarios (e.g. propagating Tracing context between different components, operational information that can be used for filtering, etc.). 

Public Interfaces

Add headers as part of processing:

 
org.apache.kafka.streams.processor.ProcessorContext:
  • Headers headers();

     

Proposed Changes

Adding `headers()` to `ProcessorContext` will enable custom processors and future DSL processors to have Headers available.

Internally, some components need to have headers available on the ProcessorContext, like:

  • o.a.k.s.p.i.AbstractProcessorContext
  • o.a.k.s.p.i.GlobalStateUpdateTask
  • o.a.k.s.p.i.ProcessorRecordContext
  • o.a.k.s.p.i.RecordCollector
  • o.a.k.s.p.i.RecordCollectorImpl
  • o.a.k.s.p.i.RecordContext
  • o.a.k.s.p.i.RecordDeserialized
  • o.a.k.s.p.i.SinkNode
  • o.a.k.s.p.i.StampedRecord

 

More details on PR: https://github.com/apache/kafka/pull/4955

Compatibility, Deprecation, and Migration Plan

  • Clients using High-Level DSL should not be affected with changes proposed.
  • Clients using Processor API will need to implement `void process(K key, V value, Headers headers);` to by-pass or handle Headers. 

Rejected Alternatives

1. Adding Headers to KTable API will mean propagate Headers to Stores that are Key Value specific like RocksDB. If headers are required in stateful operations, clients will need to map headers values first to key or value and then do processing.

2. Adding Headers to DSL API. 

Future Work

  • Adding DSL Processors to use Headers to filter/map/branch.

 

 

  • No labels