Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP tries to address the following issues in Kafka.

 

A log compacted topic is basically a kv-store, so a map API map.put(key, null) is not the same as map.remove(key), which should mean a null value should not represent a delete. a delete should be explicit (meaning flag).

Compaction currently requires a null value for it to mark as a tombstone for deletion. This is covered in KIP-82 as an issue that headers could resolve but is being separated out as is an independent issue in its own right.

This causes issues where organization use message envelopes or wrappers or where a business use case requires delivery of some information on the delete.

 

This became very apparent on the discussion thread for KIP-82 found here and herewhere it has so far been consensus that this issue should be separated out and dealt with separately. 

Some further use case that came up during this KIP's discussion:

  1. Tracability. I would like to know who issued this delete tombstone. It might include the hostname, IP of the producer of the delete.
  2. Timestamps. I would like to know when this delete was issued. This use case is already addressed by the availability of per-message timestamps that came in 0.10.0
  3. Data provenance. I hope I'm using this phrase correctly, but what I mean is, where did this delete come from? What processing job emitted it? What input to the processing job caused this delete to be produced? For example, if a record in topic A was processed and caused a delete tombstone to be emitted to topic B, I might like the offset of the topic A message to be attached to the topic B message.
  4. Distributed tracing for stream topologies. This might be a slight repeat of the above use cases. In the microservices world, we can generate call-graphs of webservices using tools like Zipkin/opentracing.io <http://opentracing.io/>, or something homegrown like https://engineering.linkedin.com/distributed-service-call-graph/real-time-distributed-tracing-website-performance-and-efficiency <https://engineering.linkedin.com/distributed-service-call-graph/real-time-distributed-tracing-website-performance-and-efficiency>. I can imagine that you might want to do something similar for stream processing topologies, where stream processing jobs carry along and forward along a globally unique identifier, and a distributed topology graph is generated.
  5. Cases where processing a delete requires data that is not available in the message key. I'm not sure I have a good example of this, though. One hand-wavy example might be where I am publishing documents into Kafka where the documentId is the message key, and the text contents of the document are in the message body. And I have a consuming job that does some analytics on the message body. If that document gets deleted, then the consuming job might need the original message body in order to "delete" that message's impact from the analytics.
  6. One potential use case is for schema registration. For example, in Avro, a null value corresponds to a Null schema. So, if you want to be able to keep the schema id in a delete message, the value can't be null

Public Interfaces

This KIP has the following public interface changes:

  1. Use an attribute bit as a tombstone flag on the core message format.
  2. This "tombstone" attribute bit is only used by the broker when a topic is configured for compaction. 
    1. If the topic is not configured for compaction it will be set and on the record and available to be read but will not cause message deletion by the broker.
    2. This will need to be added to end user documention.
  3. Add a tombstone boolean field to ProducerRecord and ConsumerRecord. 

  4. Add constructor param on ProducerRecord to set the tombstone on creation of a Record
  5. Add accessor methods on the Producer/ConsumerRecord boolean isTombstone()
  6. Add ProduceRequest/ProduceResponse V3 which uses the new message format.
  7. Add FetchRequest/FetchResponse V3 which uses the new message format.

For more detail information of the above changes, please refer to the Proposed Changes section.

Proposed Changes

Wire protocol change - use attribute bit5 as flag for tombstone boolean flag. 

 

MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
   
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8  <---------------------- Bump the Magic Byte to 2
    Attributes => int8 <---------------------- Use Bit 5 as boolean flag for 'isTombstone' flag
    Timestamp => int64
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

 

LogCleaner

Update method "shouldRetainMessage"

This will be done at the per message level.

 

 

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

Do nothing KIP-82 would remove the immediate issue.

Originally it was proposed in KIP-82 - Add Record Headers that by supporting headers would resolve this issue as then there would be no need for message wrappers as one of the issues being flagged up in that discussion thread.

As per its discussion it got agreed that we should address the compaction based on null value seperately as discussed here, as it is cleaner to be explicit than still relying on a null payload even after header support. Relying on a null payload only was agreed a bad design decision made at that time.

Use Header field to be explicit based on KIP-82 Headers Proposal

Intent here is to use a header e.g. key = 5 (compaction tombstone) value type=boolean to make an explicit flag that can be used. 

Advantages

Disadvantages

Note: If KIP-82 gets agreed on and merged and solution agreed upon lends itself to server side being header aware (aka not a client only wrapper) in time, we should re-evaluate this solution instead of the current proposed