You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

 

Status

Current state:  Discussion

Discussion thread: TBC

JIRAJIRA-TBC

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As KIP-82 introduced Headers into the core Kafka Product, it would be advantageous to expose them in the Kafka Connect Framework.

Connectors that replicate data between Kafka cluster or between other messaging products and Kafka would want to replicate the headers.

 

Public Interfaces

This KIP has the following public interface changes:

    1. Add new constructor to Source Record

      public class SourceRecord {
           
      +	public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
                          String topic, Integer partition,
                          Schema keySchema, Object key,
                          Schema valueSchema, Object value,
                          Long timestamp, Iterable<Header> headers)
         
      }
       
    2. Add new constructor to Sink Record

      public class SinkRecord {
      +   public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset,
                        Long timestamp, TimestampType timestampType, Iterable<Header> headers)
         
      }
       

 

  1. Changes needed, will piggyback onto V3 of ProduceRequest and V4 of FetchRequest which were introduced in KIP-98
  2. The serialisation of the [String, byte[]] header array will on the wire using a strict format
  3. Each headers value will be custom serialisable by the interceptors/plugins that use the header.

For more detail information of the above changes, please refer to the Proposed Changes section.

Proposed Changes

There are four options proposed before this proposal. This details our proposed solution of Option 1 described here. The other options are in the Rejected Alternatives section.

 

The advantages of this proposal are:
  • Adds the ability for producers to set standard header key=value value pairs
  • No incompatible client api change (only new methods)
  • Allows users to specify the serialisation of the header value per header
  • Provides a standardised interface to eco systems of tools that then can grow around the feature

The disadvantage of this proposal is:

  • Change to the message object

Create a Headers Interface and Implementation to encapsulate headers protocol.

  • See above public interfaces section for sample interfaces.

Add a headers field Headers to both ProducerRecord and ConsumerRecord

  • Accessor methods of Headers headers() added to interface of the ProducerRecord/ConsumerRecord.
  • Add constructor(s) of Producer/ConsumerRecord to allow passing in of an existing/pre-constructed headers via Iterable<Header> 
    1. use case is MirrorMakers able to copy headers.

Wire protocol change - add array of headers to end of the message format

The below is for the core Message wire protocol change needed to fit headers into the message.

  • A header key can occur multiple times, clients should expect to handle this, this can be represented as a multi map, or an array of values per key in clients.

This is basing off KIP-98 Message protocol proposals.

 

Message =>
        Length => varint
        Attributes => int8
        TimestampDelta => varlong
        OffsetDelta => varint
        KeyLen => varint
        Key => data
        ValueLen => varint 
        Value => data
        Headers => [Header] <------------ NEW Added Array of headers
        
Header =>
		Key => string (utf8) <------------------------------- NEW UTF8 encoded string (uses varint length)
        Value => bytes  <------------------------------------ NEW header value as data (uses varint length)

Compatibility, Deprecation, and Migration Plan

  • Current client users should not be affected, this is new api methods being added to client apis
  • Message version allows clients expecting older message version, would not be impacted 
    • older producers simply would produce a message which doesn't support headers
      • upcasting would result in a message without headers
      • new consumer would simply get a message and as no headers on send would have none.
    • older consumers simply would consume a message they would not be aware of any headers
      • down casting would remove headers from the message as older message format doesn't have these in protocol.
  • Message version migration would be handled as like in KIP-32
  • Given the mutability of the headers and the fact that we close them at some point, it means that users should not resend the same record if they use headers and interceptors.
    • They should either configure the producer so that automatic retries are attempted (preferred although there are some known limitations regarding records that are timed out before a send isactually attempted)
    • or 
    • They need to create a new record while being careful about what headers they include (if they rely fully on interceptors for headers, they could just not include any headers in the new headers).

Out of Scope

Some additional features/benefits were noticed and discussed on the above but are deemed out of scope and should be tackled by further KIPS.

  • Core message size reduction
    • remove overhead of 4 bytes for KeyLength when no headers using attributes bit
    • reduce overhead of 4 bytes for KeyLength by using variable length encoded int
    • reduce overhead of 4 bytes for ValueLength by using variable length encoded int
  • Broker side interceptors
    • with headers we could start introducing broker side message interceptors to append meta data or handle messages
  • Single Record consumer API
    • There is many uses cases where single record consumer/listener api is more user friendly - this is evident by the fact spring kafka have already created a wrapper, it would be good to support this natively.

Rejected Alternatives

Map<Int, byte[]> Headers added to the Producer/ConsumerRecord

The concept is similar to the above proposed but int keys

  • Benefits
    • more compact much reduced byte size overhead only 4 bytes.
    • String keys can dwarf the value in byte size.
  • Disadvantages
    • String keys are more common in many systems
    • Requires management of the int key space, where as string keys have natural key space management.
Map<String, String> Headers added to the ConsumerRecord

The concept is similar to the above proposed but with a few more disadvantages.

  • Benefits
    • Adds the ability for producers to set standard header key=value string value pairs
    • No incompatible client api change (only new methods)
    • Allows users to specify the serialisation of the key=value map (String(&=), JSON, AVRO).
    • Provides a standardised interface to eco systems of tools can grow around the feature
  • Disadvantages
    • Change to the message object
    • String key cause a large key, this can cause a message payload thats of 60bytes to be dwarfed by its headers
    • String value again doesn't allow for compact values, and restricts that a value must be a String
    • Not able to use headers on the broker side with custom serialisation
ProducerRecord<K, H, V>, ConsumerRecord<K, H, V>

The proposed change is that headers are Map<int, byte[]> only, this alternative is that headers can be of any type denoted by H

  • Benefits
    • Complete customisation of what a header is.
  • Disadvatages
    • As generics don't allow for default type, this would cause breaking client interface compatibility if done on Producer/ConsumerRecord.
      • Possible work-around would be to have HeadersProducer/ConsumerRecord<K, H, V> that then Producer/ConsumerRecord extend where H is object, this though becomes ugly fast if kept for a time or would require a deprecation / refactor v2 period.
Common Value Message Wrapper - Message<V>

This builds on the status quo and addresses some core issues, but fails to address some more advanced and future use cases and also has some compatibility issues for upgrade/clients not supporting.

please see: Headers Value Message Wrapper

 

Status Quo - Keep Custom Value Message Wrapper - Message<H, P>

This concept is the current defacto way many users are having to temporally deal with the situation, but has some core key issues that it does not resolve.

  • Benefits
    • This will cause no broker side changes to handle the message
  • Disadvantages
    • This would not work with compaction where headers are needed to be sent on delete record which then would not deliver on many of the requirements.
    • Every organization has custom solution
      • No ecosystem of tooling can evolve
      • Cost of Third party vendors integration high, as custom for every organization they integrate for.
    • Not able to make use of headers server side
    • Couple Serialisation and Deserialisation of the value for both the header and payload.

  • No labels