Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP tries to address the following issues in Kafka.

 

A log compacted topic is basically a kv-store, so a map API map.put(key, null) is not the same as map.remove(key), which should mean a null value should not represent a delete. a delete should be explicit (meaning flag).

Compaction currently requires a null value for it to mark as a tombstone for deletion. This is covered in KIP-82 as an issue that headers could resolve but is being separated out as is an independent issue in its own right.

This causes issues where organization use message envelopes or wrappers or where a business use case requires delivery of some information on the delete.

 

This became very apparent on the discussion thread for KIP-82 found here and herewhere it has so far been consensus that this issue should be separated out and dealt with separately. 

Some further use case that came up during this KIP's discussion:

  1. Tracability. I would like to know who issued this delete tombstone. It might include the hostname, IP of the producer of the delete.
  2. Timestamps. I would like to know when this delete was issued. This use case is already addressed by the availability of per-message timestamps that came in 0.10.0
  3. Data provenance. I hope I'm using this phrase correctly, but what I mean is, where did this delete come from? What processing job emitted it? What input to the processing job caused this delete to be produced? For example, if a record in topic A was processed and caused a delete tombstone to be emitted to topic B, I might like the offset of the topic A message to be attached to the topic B message.
  4. Distributed tracing for stream topologies. This might be a slight repeat of the above use cases. In the microservices world, we can generate call-graphs of webservices using tools like Zipkin/opentracing.io <http://opentracing.io/>, or something homegrown like https://engineering.linkedin.com/distributed-service-call-graph/real-time-distributed-tracing-website-performance-and-efficiency <https://engineering.linkedin.com/distributed-service-call-graph/real-time-distributed-tracing-website-performance-and-efficiency>. I can imagine that you might want to do something similar for stream processing topologies, where stream processing jobs carry along and forward along a globally unique identifier, and a distributed topology graph is generated.
  5. Cases where processing a delete requires data that is not available in the message key. I'm not sure I have a good example of this, though. One hand-wavy example might be where I am publishing documents into Kafka where the documentId is the message key, and the text contents of the document are in the message body. And I have a consuming job that does some analytics on the message body. If that document gets deleted, then the consuming job might need the original message body in order to "delete" that message's impact from the analytics.

Public Interfaces

This KIP has the following public interface changes:

  1. Use an attribute bit as a tombstone flag on the core message format.
  2. This "tombstone" attribute bit is only used by the broker when a topic is configured for compaction. 
    1. If the topic is not configured for compaction it will be set and on the record and available to be read but will not cause message deletion by the broker.
    2. This will need to be added to end user documention.
  3. Add a tombstone boolean field to ProducerRecord and ConsumerRecord. 

  4. Add accessor methods on the Producer/ConsumerRecord void setTombstone(boolean isTombstone) and a boolean isTombstone()
  5. Add ProduceRequest/ProduceResponse V4 which uses the new message format.
  6. Add FetchRequest/FetchResponse V4 which uses the new message format.

For more detail information of the above changes, please refer to the Proposed Changes section.

...

 

MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
   
  Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value
    Crc => int32
    MagicByte => int8 
    Attributes => int8 <---------------------- Use Bit 5 as boolean flag for 'isTombstone' flag
    Timestamp => int64
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

 

LogCleaner

Update method "shouldRetainMessage" to also look at attribute bit 5 for tombstone marker

 

Compatibility, Deprecation, and Migration Plan

  • Migration Plan
      Tombstone feature will be backwards compatible
      • We will have a two staged migration plan
      • Stage 1:
        • message.format.version=0.10.2.0-1
        • The broker should understand the attribute flag as well as Null for the value for log compaction
        • If consuming client is of older version, we down convert the message if attribute flag is set, nulling the value so that they preserve existing behaviour
      • Stage 2:
      • Message version migration would be handled as like in KIP-32
      • Bit flag would be default false 
      • Logic would base on current behavior of null value or if tombstone flag set to true, as such wouldn't impact any existing flows simply allow new producers to make use of the feature.
        • message.format.version=0.10.2.0
        • The broker only understands the attribute flag for log compaction


    Rejected Alternatives

    Do nothing KIP-82 would remove the immediate issue.

    ...