Motivation
To address the uses cases in KIP-82, an implementation that details the pros and cons and also the changes needed if a message wrapper solution was taken.
This is not the proposed solution, but more an elaboration of an alternative solution, to show more clearly why it was discounted.
Advantages/Disadvantages
The advantages of this option:
- MessageAndOffset is limited in change to just using up one attribute bit for tombstone marker
- Ability to set headers from a client side using a common interface
- Message Headers are kept to client side only implementation
The disadvantage of this option
- Change to the message object
- Client users need to know in advance if message will use headers or not (as in will use the wrapper)
- Headers are more for the platform needs, enforcing a wrapper makes this not invisible to end user code where only platforms need to add,consume headers via interceptors
- Cannot make use of the headers server side
- e.g. Server Side Plugins/Interceptors
- No versioning server side allowing older clients / other language clients to co-exist
- Upgrade compatibility issue - new topics needed to be created
- This is more prevalent for consumers
- Point solution for handling compaction server side
The amount of change needed both client and server side is similar in size to proposed solution in KIP-82 but has more above drawbacks.
Changes
- Creation of a common Kafka Message Wrapper for Java Client
- Interceptors to be made message wrapper aware
- Client users to create the message wrapper inserting their value inside it before creating producer record
- The serialisation of the [int, bye[]] header set will on the wire using a strict format
- Each headers value will be custom serialisable by the interceptors/plugins that use the header.
- As int key based headers for compactness ranges will be reserved for different usages:
- client headers adopted by the kafka open source (not necessary the kafka open source owns it)
- broker headers adopted by the kafka open source (not necessary the kafka open source owns it)
- commercial vendor space
- custom inhouse
- To handle compaction issue
- Update Producer/Consumer record to set tombstone marker
- use an attribute bit 4 as boolean flag to mark if record should be deleted ("tombstone marker")
- Add ProduceRequest/ProduceResponse V4 which uses the new message format.
- Add FetchRequest/FetchResponse V4 which uses the new message format.
LogCleaner
Update method "shouldRetainMessage" to also look at attribute bit 4 for tombstone marker
MessageAndOffset
Wire protocol change
Code Block | ||
---|---|---|
| ||
MessageAndOffset => Offset MessageSize Message Offset => int64 MessageSize => int32 Message |
...
=> Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value
Crc => int32
MagicByte => int8 <---------------------- Bump up magic byte to 2
Attributes => int8 <---------------------- Use Bit 4 as boolean flag for compaction to signify deletion / tombstone
Timestamp => int64
KeyLength => int32
Key => bytes
ValueLength => int32
Value => bytes
|
Header Value Message Wrapper
MessageWrapper (Java)
...
Code Block | ||
---|---|---|
| ||
public class Message<V>MessageWrapper<V> { private Map<Integer, byte[]> headers; private V value; public MessageMessageWrapper(Map<Integer, byte[]> headers, V value){ this.headers = headers; this.value = value; } public Map<Integer, byte[]> getHeaders() { return headers; } public V getValue() { return value; } } |
Wire protocol of the
...
Message wrapper
Code Block | ||
---|---|---|
| ||
MessageMessageWrapper (bytes) => Set(HeaderKeyHeadersLength, HeaderLengthHeaders, Value), ValueLength, Value MagicByte => int8 <------------------- 0 this is used for future versioning HeadersLength => int32 <--------------- NEW [optional] size of the byte[] of the serialised headers if headers Headers => bytes <--------------------- NEW [optional] serialised form of the headers Map<int, byte[]> ValueLength => int32 <----------------- size of the byte[] of the serialised values Value => bytes <----------------------- byte[] of the value Headers (bytes) => Set(Key, ValueLength, Value) Set => Key => int32 <---------------------- NEW int key of the header ValueLength => int32 <-------------- NEW size of the byte[] of the serialised header value Value => bytes <-------------------- NEW serialised form of the header value |
Compatibility, Deprecation, and Migration Plan
- MessageWrapper will not be back compatible
- Current client users will be affected
- new topics will be needed
- will need to instantiate separately and produce the message wrapper.
- Current client users will be affected
- Tombstone feature will be backwards compatible
- Message version migration would be handled as like in KIP-32