THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- Adds the ability for producers to set standard header key=value value pairs
- No incompatible client api change (only new methods)
- Allows users to specify the serialisation of the header value per header
- Provides a standardised interface to eco systems of tools that then can grow around the feature
The disadvantage of this proposal is:
- Change to the message object
Create a Headers Interface and Implementation to encapsulate headers protocol.
- Fields:
- Headers[] headersArray
- Constructors
- ()
- (bytes[] headerBytes)
- Methods
- Collection<byte[]> get(String key)
- void add(String key, byte[] value)
- Collection<String> keys()
- byte[] asBytes()
Add a headers field Headers to both ProducerRecord and ConsumerRecord
- Accessor methods of Headers getHeaders() added to interface of the ProducerRecord/ConsumerRecord.
Wire protocol change -
...
add array of headers to end of the message format
The below is for the core Message wire protocol change needed to fit headers into the message.
- A header key can occur multiple times, clients should expect to handle this, this can be represented as a multi map, or an array of values per key in clients.
This is basing off KIP-98 Message protocol proposals.
Code Block | ||
---|---|---|
| ||
MessageAndOffsetMessage => Offset MessageSize Message Offset => int64 MessageSizeLength => int32uintVar Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value => int8 CrcTimestampDelta => int32intVar MagicByte => int8 <---------------------- Bump up magic byte to 2 OffsetDelta => uintVar AttributesKeyLen => uintVar int8[OPTIONAL] Timestamp => int64 KeyLengthKey => data VarInt[OPTIONAL] Key ValueLen => uintVar bytes[OPTIONAL] HeadersLength => VarInt <--------------- NEW size of the byte[] of the serialised headers if headers Value => data [OPTIONAL] Headers => bytes Header1, Header2 … HeaderN <--------------------- NEW serialisedAdded formArray of the Header[] ValueLength => VarInt headers Value => bytes |
Wire protocol of the headers bytes (if above mentioned attributes bit flag is true)
The below is for the headers wire protocol.
...
Code Block | ||
---|---|---|
| ||
Headers (bytes) => [Header] Header => KeyLengthLength => VarIntuintVar <-------------------------------- NEW sizelength of the byte[] of the serialised key value Keyindividual header KeyLen => bytesuintVar <-------------------------------- NEW serialisedlength stringof (UTF8)key bytes of the header key ValueLengthKey => data VarInt(utf8) <------------------------------- NEW UTF8 encoded sizestring ofas thedata (byte[]) of the serialised header value Value => bytesdata <------------------------------------ NEW serialisedheader formvalue ofas the header value data (byte[]) |
Compatibility, Deprecation, and Migration Plan
- Current client users should not be affected, this is new api methods being added to client apis
- Message version allows clients expecting older message version, would not be impacted
- older producers simply would produce a message without headers
- new consumer would simply get a message with empty headers
- older consumers simply would consume a message oblivious to there being any headers
- older producers simply would produce a message without headers
- Message version migration would be handled as like in KIP-32
...