Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP tries to address the following issues in Kafka.
Compaction currently requires a null value for it to mark as a tombstone for deletion. This is covered in KIP-82 as an issue that headers could resolve but is being separated out as is an independent issue in its own right.
This causes issues where organization use message envelopes or wrappers or where a business use case requires delivery of some information on the delete.
Some further use case that came up during this KIP's discussion:
- Tracability. I would like to know who issued this delete tombstone. It might include the hostname, IP of the producer of the delete.
- Timestamps. I would like to know when this delete was issued. This use case is already addressed by the availability of per-message timestamps that came in 0.10.0
- Data provenance. I hope I'm using this phrase correctly, but what I mean is, where did this delete come from? What processing job emitted it? What input to the processing job caused this delete to be produced? For example, if a record in topic A was processed and caused a delete tombstone to be emitted to topic B, I might like the offset of the topic A message to be attached to the topic B message.
- Distributed tracing for stream topologies. This might be a slight repeat of the above use cases. In the microservices world, we can generate call-graphs of webservices using tools like Zipkin/opentracing.io <http://opentracing.io/>, or something homegrown like https://engineering.linkedin.com/distributed-service-call-graph/real-time-distributed-tracing-website-performance-and-efficiency <https://engineering.linkedin.com/distributed-service-call-graph/real-time-distributed-tracing-website-performance-and-efficiency>. I can imagine that you might want to do something similar for stream processing topologies, where stream processing jobs carry along and forward along a globally unique identifier, and a distributed topology graph is generated.
- Cases where processing a delete requires data that is not available in the message key. I'm not sure I have a good example of this, though. One hand-wavy example might be where I am publishing documents into Kafka where the documentId is the message key, and the text contents of the document are in the message body. And I have a consuming job that does some analytics on the message body. If that document gets deleted, then the consuming job might need the original message body in order to "delete" that message's impact from the analytics.
Public Interfaces
This KIP has the following public interface changes:
- Use an attribute bit as a tombstone flag on the core message format.
- This "tombstone" attribute bit is only used by the broker when a topic is configured for compaction.
- If the topic is not configured for compaction it will be set and on the record and available to be read but will not cause message deletion by the broker.
- This will need to be added to end user documention.
Add a tombstone boolean field to ProducerRecord and ConsumerRecord.
- Add accessor methods on the Producer/ConsumerRecord void setTombstone(boolean isTombstone) and a boolean isTombstone()
- Add ProduceRequest/ProduceResponse V4 which uses the new message format.
- Add FetchRequest/FetchResponse V4 which uses the new message format.
For more detail information of the above changes, please refer to the Proposed Changes section.
Proposed Changes
Wire protocol change - use attribute bit5 as flag for tombstone boolean flag.
- The attributes flag bit is used to keep the message size the same as before.
MessageAndOffset => Offset MessageSize Message Offset => int64 MessageSize => int32 Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value Crc => int32 MagicByte => int8 Attributes => int8 <---------------------- Use Bit 5 as boolean flag for 'isTombstone' flag Timestamp => int64 KeyLength => int32 Key => bytes ValueLength => int32 Value => bytes |
LogCleaner
Update method "shouldRetainMessage" to also look at attribute bit 5 for tombstone marker
Compatibility, Deprecation, and Migration Plan
- Migration Plan
- We will have a two staged migration plan
- Stage 1:
- message.format.version=0.10.1-1
- The broker should understand the attribute flag as well as Null for the value for log compaction
- If consuming client is of older version, we down convert the message if attribute flag is set, nulling the value so that they preserve existing behaviour
- Stage 2 (once all consumers and producers are updated / new installs):
- set message.format.version=0.10.2 and rolling restart brokers
- The broker only understands the attribute flag for log compaction
Rejected Alternatives
Do nothing KIP-82 would remove the immediate issue.
Originally it was proposed in KIP-82 - Add Record Headers that by supporting headers would resolve this issue as then there would be no need for message wrappers as one of the issues being flagged up in that discussion thread.
As per its discussion it got agreed that we should address the compaction based on null value seperately as discussed here, as it is cleaner to be explicit than still relying on a null payload even after header support. Relying on a null payload only was agreed a bad design decision made at that time.
Use Header field to be explicit based on KIP-82 Headers Proposal
Intent here is to use a header e.g. key = 5 (compaction tombstone) value type=boolean to make an explicit flag that can be used.
Advantages
- Avoids any further changes to the core message protocol , this is one of the intents of KIP-82 to be able to add additional platform level information without constant protocol changes
- Avoids using the attributes flags which are finite in size
Disadvantages
- Risk of the issues this KIP address being dependant on KIP-82, and as KIP-82 is more contentious risks not dealing with the immediate issue for the next release
- This could always be re-worked in future to use a header in a future release.
Note: If KIP-82 gets agreed on and merged and solution agreed upon lends itself to server side being header aware (aka not a client only wrapper) in time, we should re-evaluate this solution instead of the current proposed