Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRAhttps://issues.apache.org/jira/browse/KAFKA-7061

PULL REQUEST: https://github.com/apache/kafka/pull/51497368 (WIP)

Motivation

Current Kafka log compaction is based on server side view, which means records are compacted only based on records offset. For the same key, only the highest offset record is kept after compaction. Note that records are appended to logs based on the order when message is received on broker for the same topic partition. This default strategy insufficient in many scenarios. On the producer side, when multiple producers produce to the same topic partition, the producer side record order cannot be guaranteed on the server side. This is because the message transmitting over the network is determined by many factors out of the control of Kafka. On the server side, which message is reserved after compaction is random. The following is a motivating example:

...

There are no changes to the public interfaces. 

Proposed Changes

...

  • "log.cleaner.compaction.strategy"
    • The active compaction strategy to use;
    • Accepts values "offset", "timestamp" and "header", allowing for further strategies to be added in the future as needed;
  • "log.cleaner.compaction.strategy.header"
    • Configuration sub-set to use when the strategy is set to "header";

...

  • The previous behavior is active, compacting the logs purely based on offset;
  • Also used when the configuration is either empty or not present, making this the default strategy;

...

  • Enhance log compaction to support more than just offset comparison, so the insertion order isn't always dictating which records to keep (in effect, allowing for a form of OCC);
  • The current behavior should remain as the default in order to minimize impact on already existing clients and avoid any migration efforts;
  • New Configurations:
    • "log.cleaner.compaction.strategy"
      • The active compaction strategy to use;
      • Accepts values "offset", "timestamp" and "header", allowing for further strategies to be added in the future as needed;
    • "log.cleaner.compaction.strategy.header"
      • Configuration sub-set to use when the strategy is set to "header";
  • Compaction Strategies:
    • "offset"
      • The previous behavior is active, compacting the logs purely based on offset;
      • Also used when the configuration is either empty or not present, making this the default strategy;
    • "timestamp"
      • The record timestamp will be used to determine which record to keep, in a 'keep-highest' approach;
      • When both records being compared contain an equal timestamp, then the record with the highest offset will be kept;
      • This requires caching also the timestamp field during compaction, in addition to the base offset, so each record being compacted will suffer a memory increase from 8 bytes to 16 bytes when using this strategy.
    • "header"
      • Searches the record for a header key that matches the configured value on "compaction.strategy.header";
      • If the "compaction.strategy.header" configuration is not set (or is blank), then the compaction strategy will fallback to "offset";
      • If a header key that matches the configuration exists, then the header value (which must be of type "long") will be used to determine which record to keep, in a 'keep-highest' approach;
      • If both records being compared do not have a matching header key, then the record with the highest offset will be kept;
      • If both records being compared contain an equal header value, then the record with the highest offset will be kept;
      • If only one of the records being compared has a matching header, then this record is kept, as the other record is considered to be anomalous;
      • This requires caching also the header field during compaction, in addition to the base offset, so each record being compacted will suffer a memory increase from 8 bytes to 16 bytes when using this strategy.

...