Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: DiscussionAccepted (vote)

Discussion threadhttps://lists.apache.org/thread.html/67fcfe37169bdbabdbecc30686ccba0f5f27e193c468a1fe5d0062ed@%3Cdev.kafka.apache.org%3E

Old Discussion threadhttps://lists.apache.org/thread.html/f44317eb6cd34f91966654c80509d4a457dbbccdd02b86645782be67@%3Cdev.kafka.apache.org%3E

JIRAhttps://issues.apache.org/jira/browse/

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7061

PULL REQUEST: https://github.com/apache/kafka/pull/75288103

Motivation

Current log compaction is based on the server side view i.e. compacted based on record offset and the offset is by the order when the record was received on the broker side. So for the same key, only the highest offset record is kept after compaction so that Kafka is able to reconstruct the current state of the events in a "most recent snapshot" approachThe issue then occurs when the insertion order is not guaranteed, which causes the log compaction to keep the wrong state. This can be easily replicated when using a multi-threaded (or simply multiple) producer(s), or when sending the events asynchronously. The following is an example:

...

To resolve the above issue, we are proposing to add a feature to support compaction based on producer signal (i.e. adding 2 more compaction strategies, record timestamp and header sequence/version) and keeping the current compaction (i.e. offset based) as the default compaction for the backward compatibility. By this way, the producer will have an option to own and control the record ordering. As the log compaction is at the topic level and a broker can have multiple topics, keeping the compaction strategy configuration at topic level will be ideal. As the proposed configuration is at the topic level, the user can choose to enable a different compaction strategy for a subset of compact topics or at the broker level for all topics within the broker. While this proposal only supports two compaction strategies, it leaves the option open to add more compaction strategy in future.

Special case where we need to retain LEO (log-end-offset) record / create an empty message batch for non-offset based compaction strategy: 

Today with the offset-only compaction strategy, the last record of the log (we call it the log-end-record, whose offset is log-end-offset) would always be preserved and not compacted. This is kinda important for replication since followers reason about the log-end-offset on the leader. Consider this case: three replicas of a partition, leader 1 and follower 2 and 3.

Leader 1 has records a, b, c, d and d is the current last record of the partition, the current log-end-offset is 3 (assuming record a's offset is 0).

Follower 2 has replicated a, b, c, d. Log-end-offset is 3 Follower 3 has replicated a, b, c but not yet replicated d. Log-end-offset is 2.

NOTE: that the compaction triggering are independent on brokers, it is possible that leader 1 triggers compaction and deletes record d, while other followers have not triggered compaction yet. At this moment the leader's log becomes a, b, c. Now let's say follower 3 fetch from leader after the compaction, it will no longer see record d.

Now suppose there's a leader migration and follower 3 becomes the new leader, it would accept new appends (say, it's e), and record e would be appended at *offset 3 *on new leader 3's log. But follower 2's offset 3's record is d still. Later let's say follower 2 also triggers compaction and also fetches the new record e from new leader 3:

Follower 2's log would be* a(0), b(1), c(2), e(4)* where the numbers in brackets are offset number; while leader 3's log would be *a(0), b(1), c(2), e(3)*. Now you see the two logs diverges in offsets, although their log entries are the same.

One way to resolve this, is to simply never remove the last message during compaction. Another way (suggested by Jason in the old VOTE thread) is to create an empty message batch to "take up" that offset slot.

Acknowledgement: we thank the previous author of this KIP proposal, Luís Cabral.

...

Adding below new configuration properties in both broker level and topic level configuration:

Broker Level:

  1. log.cleaner.compaction.strategy
    • Broker level it is optional and default it to offset
  2. log.cleaner.compaction.strategy.header

Topic Level:

  1. compaction.strategy
    • At topic level, default to broker level compact strategy
  2. log.cleaner.compaction.strategy.header

...

  • The current behavior should remain as the default in order to minimize impact on already existing clients and avoid any migration efforts;
  • New Configuration: 
    1. log.cleaner.compaction.strategy
      • The active compaction strategy to use;
      • Accepts values "offset", "timestamp" and "header", allowing for further strategies to be added in the future as needed;
    2. log.cleaner.compaction.strategy.header
      • Configuration sub-set to use when the strategy is set to "header";
  • Compaction Strategies:
    • "offset"
      • The current behavior is active, compacting the logs purely based on offset;
      • Also used when the configuration is either empty or not present, making this the default strategy;
    • "timestamp"
      • The record timestamp will be used to determine which record to keep, in a 'keep-highest' approach;
      • When both records being compared contain an equal timestamp, then the record with the highest offset will be kept;
      • This requires caching also the timestamp field during compaction, in addition to the base offset, so each record being compacted will suffer a memory increase from 8 bytes to 16 bytes when using this strategy.
    • "header"
      • Searches the record for a header key that matches the configured value on "log.cleaner.compaction.strategy.header";
        • as the header can have duplicates, will pick the last occurrence of the header key
      • If both records being compared do not have a matching header key, then the record with the highest offset will be kept;
      • If a header key that matches the configuration exists, then the header value (which must be of type "long" - 8 bytes) will be used to determine which record to keep, in a 'keep-highest' approach;
      • If both records being compared contain an equal header value, then the record with the highest offset will be kept;
      • If only one of the records being compared has a matching header, then this record is kept, as the other record is considered to be anomalous;
      • This requires caching also the header value during compaction, in addition to the base offset, so each record being compacted will suffer a memory increase from 8 bytes to 16 bytes when using this strategy.
  • Retain LEO (log-end-offset) record during compaction

Compatibility, Deprecation, and Migration Plan

...

  • Update producer to send the header value in all record.
  • Rollout Roll out the producer first to all clusters.
  • Once all producers sending the header value confirmed, update the topic config on the broker side with the header strategy.
  • Note:
    • any existing topic migration, the already compacted log still remains as such (i.e. offset based) and only the new logs will get compacted using the new strategy once the topic config updated with the strategy.
    • in any case if topic strategy needs roll back to default offset strategy, first topic config on the broker side should be updated to offset strategy and then producer can stop generating the header value.

Recommendations

  • For scenarios like the low produce rate, the topic partition remaining ineligible for compaction for an unbounded duration where by "delete.retention.ms" triggers that removes the tombstone record if exist. In that case we recommend the Kafka users to have "segment.ms" & "max.compaction.lag.ms" (as compaction won't happen on active segment) to be smaller than the "delete.retention.ms".
  • As this KIP is introducing configurable compaction strategy, the Consumer should be aware and follow the same compact strategy as in broker to avoid inconsistency on what records to keep.

Rejected Alternatives

      (This section remains the same as previous proposal.)

...