...
This KIP tries to address the following issues in Kafka.
Compaction currently requires a null value for it to mark as a tombstone for deletion. This is covered in KIP-82 as an issue that headers could resolve but is being separated out as is an independent issue in its own right.
This causes issues where organization use message envelopes or wrappers or where a business use case requires delivery of some information on the delete.
Some further use case that came up during this KIP's discussion:
- Tracability. I would like to know who issued this delete tombstone. It might include the hostname, IP of the producer of the delete.
- Timestamps. I would like to know when this delete was issued. This use case is already addressed by the availability of per-message timestamps that came in 0.10.0
- Data provenance. I hope I'm using this phrase correctly, but what I mean is, where did this delete come from? What processing job emitted it? What input to the processing job caused this delete to be produced? For example, if a record in topic A was processed and caused a delete tombstone to be emitted to topic B, I might like the offset of the topic A message to be attached to the topic B message.
- Distributed tracing for stream topologies. This might be a slight repeat of the above use cases. In the microservices world, we can generate call-graphs of webservices using tools like Zipkin/opentracing.io <http://opentracing.io/>, or something homegrown like https://engineering.linkedin.com/distributed-service-call-graph/real-time-distributed-tracing-website-performance-and-efficiency <https://engineering.linkedin.com/distributed-service-call-graph/real-time-distributed-tracing-website-performance-and-efficiency>. I can imagine that you might want to do something similar for stream processing topologies, where stream processing jobs carry along and forward along a globally unique identifier, and a distributed topology graph is generated.
- Cases where processing a delete requires data that is not available in the message key. I'm not sure I have a good example of this, though. One hand-wavy example might be where I am publishing documents into Kafka where the documentId is the message key, and the text contents of the document are in the message body. And I have a consuming job that does some analytics on the message body. If that document gets deleted, then the consuming job might need the original message body in order to "delete" that message's impact from the analytics.
Public Interfaces
This KIP has the following public interface changes:
- Use an attribute bit as a tombstone flag on the core message format.
- This "tombstone" attribute bit is only used by the broker when a topic is configured for compaction.
- If the topic is not configured for compaction it will be set and on the record and available to be read but will not cause message deletion by the broker.
- This will need to be added to end user documention.
Add a tombstone boolean field to ProducerRecord and ConsumerRecord.
- Add accessor methods on the Producer/ConsumerRecord void setTombstone(boolean isTombstone) and a boolean isTombstone()
- Add ProduceRequest/ProduceResponse V4 which uses the new message format.
- Add FetchRequest/FetchResponse V4 which uses the new message format.
For more detail information of the above changes, please refer to the Proposed Changes section.
...
MessageAndOffset => Offset MessageSize Message Offset => int64 MessageSize => int32 Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value Crc => int32 MagicByte => int8 Attributes => int8 <---------------------- Use Bit 5 as boolean flag for 'isTombstone' flag Timestamp => int64 KeyLength => int32 Key => bytes ValueLength => int32 Value => bytes |
LogCleaner
Update method "shouldRetainMessage" to also look at attribute bit 5 for tombstone marker
Compatibility, Deprecation, and Migration Plan
- Migration Plan
- We will have a two staged migration plan
- Stage 1:
- message.format.version=0.10.2.0-1
- The broker should understand the attribute flag as well as Null for the value for log compaction
- If consuming client is of older version, we down convert the message if attribute flag is set, nulling the value so that they preserve existing behaviour
- Stage 2:
- Message version migration would be handled as like in KIP-32
- Bit flag would be default false
- Logic would base on current behavior of null value or if tombstone flag set to true, as such wouldn't impact any existing flows simply allow new producers to make use of the feature.
- message.format.version=0.10.2.0
- The broker only understands the attribute flag for log compaction
Rejected Alternatives
Do nothing KIP-82 would remove the immediate issue.
...