You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state:  Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]

Discussion threadhere [Change the link from the KIP proposal email archive to your own email thread]

JIRAhere [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP tries to address the following issues in Kafka.

In most message systems (JMS, QPID etc), streaming systems and most transport systems(HTTP, TCP), it is typical to have a concept of headers and payload.

The payload is traditionally for the business object, and headers are traditionally used for transport routing, filtering etc. Headers are most typically key=value pairs of strings.

In its current state Kafka does not support the ability to have headers natively in its message/record format.

Examples where having separate supported custom headers becomes useful (this is not an exhaustive list).

  • Automated routing of messages based on header information between clusters
  • Enterprise APM tools (e.g. Appdynamics, Dynatrace) need to stitch in 'magic' transaction ids for them to provide end to end transaction flow monitoring.
  • Audit metadata to be recorded with the message, e.g. clientId that produced the record, unique message id, originating clusterId the message was first produced into for multi cluster routing. 
  • Business payload needs to be end to end encrypted and signed without tamper, but eco-system components need access to metadata to achieve tasks.  

Kafka currently has Record<K, V> structure which originally could be used to follow this semantic where by K could contain the headers information, and the V could be the payload.

  • Since message compaction feature it is no longer possible to add metadata to K, else compaction would treat each message as a different keyed message .
  • It is not currently possible to use value part and use some form of a wrapper e.g. Message<H, V>, as for compaction to perform a delete a record is sent with a NULL value, as such for where a delete record is sent using a message wrapper to carry the metadata would not work, as the value technically would no longer be null.

This issue has been flagged by many people over the past period in forums.

 

Public Interfaces

This KIP has the following public interface changes:

  1. Add a new headers value (byte[]) to the message format.
  2. Add a headers (Map<String, String>) field to ProducerRecord and ConsumerRecord. A producer will be able to set headers on a ProducerRecord. A consumer will see the message headers when it sees the messages.

  3. Add accessor methods on the Producer/ConsumerRecord void setHeader(String, String) and a String getHeader(String)
  4. Add ProduceRequest/ProduceResponse V3 which uses the new message format.
  5. Add FetchRequest/FetchResponse V3 which uses the new message format.
  6. Add a configurable headers serializer Serializer<Map<String, String> to ProducerRecord
  7. Add a configurable headers deserializer Deserializer<Map<String, String> HeadersDeserializer to ConsumerRecord

For more detail information of the above changes, please refer to the Proposed Changes section.

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

There are three options proposed before this proposal. This details our proposed solution of Option 1 described here. The other options Option 2 and Option 3 are in the Rejected Alternatives section.

The advantages of this proposal are:
  • Adds the ability for producers to set standard header key=value string value pairs
  • No incompatible client api change (only new methods)
  • Allows users to specify the serialization of the key=value map (String(&=), JSON, AVRO).
  • Provides a standardized interface to eco systems of tools can grow around the feature

The disadvantage of this proposal is:

  • 4 bytes overhead if no headers / feature is not used without headers flag byte.
    • This could be mitigated by using message magic byte = 1 and using the version 1 message for those not using the feature or where headers are null.
  • 1 byte overhead if not headers / feature is not used with headers flag byte but extra byte overhead when using headers.
  • Change to the message object

Wire protocol change - add a header field and size to the message format (without headers flag)

MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8  <---------------------- Bump up magic byte to 2
    Attributes => int8
    Timestamp => int64
    KeyLength => int32
    Key => bytes
    HeadersLength => int32 <------------------ NEW length of the byte[] of the serialized headers (int32 = 0 if no headers)
    Headers => bytes <------------------------ NEW serialized form of the headers Map<String, String> (if old message or no headers this can empty)
    ValueLength => int32
    Value => bytes

 

Wire protocol change - add a header field and size to the message format (with headers flag)

MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8  <---------------------- Bump up magic byte to 2
    Attributes => int8
    Timestamp => int64
    KeyLength => int32
    Key => bytes
    HeadersFlag => byte <--------------------- NEW
    (optional) HeadersLength => int32 <------------------ NEW length of the byte[] of the serialized headers (int32 = 0 if no headers)
    (optional) Headers => bytes <------------------------ NEW serialized form of the headers Map<String, String> (if old message or no headers this can empty)
    ValueLength => int32
    Value => bytes

 

Add a headers field Map<String, String> to both ProducerRecord and ConsumerRecord

  • Accessor methods of void setHeader(String, String) and String getHeader(String) added to interface of both.

Add configurable Deserializer and Serializer to KafkaConsumer and KafkaProducer

  • KafkaProducer headers.serializer denotes the serializer to use
    • Producer uses the headers.serializer to convert the headers field (Map<String, String>) in the ProducerRecord to byte[]
  • KafkaConsumer headers.deserializer denotes the deserializer to use
    • Fetcher is updated to user the headers.deserialzier to convert byte[] to the headers field (Map<String, String>) set in the ConsumerRecord

Create a simple String UTF8 based Serializer/Deserializer implementation for headers.serializer/headers.deserialzier and default config to use this.

 

  • This would work by converting map into a string in form of key1=value1&key2=value2&key3=value3 (like query params, url encoding the key and values) and then getBytes("UTF-8") to get the byte[]

 

 

Compatibility, Deprecation, and Migration Plan

  • Current users should not be affected, this is new api methods being added
  • Message version allows clients expecting older message version, would not be impacted 
    • older producers simply would produce a message without headers
      • new consumer would simply get a message with empty headers
    • older consumers simply would consume a message oblivious to there being any headers
  • Message version migration would be handled as like in KIP-32

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

Value Message Wrapper - Message<H, P>

This concept is the current defacto way many users are having to temporally deal with the situation, but has some core key issues that it does not resolve.

  • Benefits
    • This will cause no broker side changes, and or message format changes
  • Disadvantages
    • This would not work with compaction where headers are needed to be sent on delete record which then would not deliver on many of the requirements.
    • Couple Serialization and Deserialization of the value for both the header and payload.
ProducerRecord<K, H, V>, ConsumerRecord<K, H, V>

The proposed change is that headers are Map<String, String> only, this alternative is that headers can be of any type denoted by H

  • Benefits
    • Complete customization of what a header is.
  • Disadvatages
    • As generics don't allow for default type, this would cause breaking client interface compatibility if done on Producer/ConsumerRecord.
      • Possible work-around would be to have HeadersProducer/ConsumerRecord<K, H, V> that then Producer/ConsumerRecord extend where H is object, this though becomes ugly fast if kept for a time or would require a deprecation / refactor v2 period.
  • No labels