You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state:  Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP tries to address the following issues in Kafka.

In most message systems (JMS, QPID etc), streaming systems and most transport systems(HTTP, TCP), it is typical to have a concept of headers and payload.

The payload is traditionally for the business object, and headers are traditionally used for transport routing, filtering etc.

In its current state Kafka does not support the ability to have headers natively in its message/record format.

Examples where having separate supported custom headers becomes useful (this is not an exhaustive list).

  • Automated routing of messages based on header information between clusters
  • Enterprise APM tools (e.g. Appdynamics, Dynatrace) need to stitch in 'magic' transaction ids for them to provide end to end transaction flow monitoring.
  • Audit metadata to be recorded with the message, e.g. clientId that produced the record, unique message id, originating clusterId the message was first produced into for multi cluster routing. 
  • Business payload needs to be end to end encrypted and signed without tamper, but eco-system components need access to metadata to achieve tasks.  

Kafka currently has Record<K, V> structure which originally could be used to follow this semantic where by K could contain the headers information, and the V could be the payload.

  • Since message compaction feature it is no longer possible to add metadata to K, else compaction would treat each message as a different keyed message .
  • It is not currently possible to use value part and use some form of a wrapper e.g. Message<H, V>, as for compaction to perform a delete a record is sent with a NULL value, as such for where a delete record is sent using a message wrapper to carry the metadata would not work, as the value technically would no longer be null.

This issue has been flagged by many people over the past period in forums.

 

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

This KIP has the following public interface changes:

  1. Add a new headers value (byte[]) to the message format.
  2. Add a headers (Map<String, String>) field to ProducerRecord and ConsumerRecord. A producer will be able to set headers on a ProducerRecord. A consumer will see the message headers when it sees the messages.
  3. Add ProduceRequest/ProduceResponse V2 which uses the new message format.
  4. Add a timestamp in ProduceResponse V2 for each partition. The timestamp will either be LogAppendTime if the topic is configured to use it or it will be NoTimestamp if create time is used.
  5. Add FetchRequest/FetchResponse V2 which uses the new message format.
  6. Add a timestamp variable to RecordMetadata. The timestamp is the timestamp of messages appended to partition log.

For more detail information of the above changes, please refer to the Proposed Changes section.

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels