Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: DiscussionAccepted (vote)

Discussion threadhttps://lists.apache.org/thread.html/79aa6e50d7c737ddf83455dd8063692a535a1afa558620fe1a1496d3@<dev67fcfe37169bdbabdbecc30686ccba0f5f27e193c468a1fe5d0062ed@%3Cdev.kafka.apache.org>org%3E

JIRAOld Discussion threadhttps://issueslists.apache.org/thread.html/jira/browse/f44317eb6cd34f91966654c80509d4a457dbbccdd02b86645782be67@%3Cdev.kafka.apache.org%3E

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7061

PULL REQUEST: https://github.com/apache/kafka/pull/51498103 (WIP)

Motivation

Current Kafka log compaction is based on the server side view , which means records are compacted only based on records offset. For i.e. compacted based on record offset and the offset is by the order when the record was received on the broker side. So for the same key, only the highest offset record is kept after compaction . Note that records are appended to logs based on the order when message is received on broker for the same topic partition. This default strategy insufficient in many scenarios. On the producer side, when multiple producers produce to the same topic partition, the producer side record order cannot be guaranteed on the server side. This is because the message transmitting over the network is determined by many factors out of the control of Kafka. On the server side, which message is reserved after compaction is random. The following is a motivating so that Kafka is able to reconstruct the current state of the events in a "most recent snapshot" approachThe issue then occurs when the insertion order is not guaranteed, which causes the log compaction to keep the wrong state. This can be easily replicated when using a multi-threaded (or simply multiple) producer(s), or when sending the events asynchronously. The following is an example:

Producer 1 tries to send a message <K1, V1> to topic A partition p1. Producer 2 tries to send a message <K1, V2> to the same (i.e. topic A partition p1). On the producer side, we clearly preserve an order for the two messagemessages, <K1, V1> <K1, V2>. But on the server side, this order can be random, meaning, message <K1, V1> could have a high higher offset due to the fact this message is received later than <K1, V2>. When compaction happens, <K1, V1> will be kept, and clearly this is not what is intended.

To resolve the above issue, we propose are proposing to add a feature to support two compaction based on producer signal (i.e. adding 2 more compaction strategies, default record timestamp and header sequence. The default compaction will be kept as is. The header sequence strategy will support compaction on producer side message sequence. The proposed configuration is per topic, meaning a /version) and keeping the current compaction (i.e. offset based) as the default compaction for the backward compatibility. By this way, the producer will have an option to own and control the record ordering. As the log compaction is at the topic level and a broker can have multiple topics, keeping the compaction strategy configuration at topic level will be ideal. As the proposed configuration is at the topic level, the user can choose to enable a different compaction strategy for a subset of compact topics or at the broker level for all topics within the broker. While this proposal only supports two compaction strategies, it leaves open the option open to add more compaction strategy in future. 

Acknowledgement: we thank the previous author of this KIP proposal,  Luís Cabral, many of his changes are kept in this proposal. 

In order to use Kafka as the message broker within an Event Source architecture, it becomes essential that Kafka is able to reconstruct the current state of the events in a "most recent snapshot" approach.

This is where log compaction becomes an integral part of the workflow, as only the latest state is of interest. At the moment, Kafka accomplishes this by considering the insertion order (or highest offset) as a representation of the latest state.

The issue then occurs when the insertion order is not guaranteed, which causes the log compaction to keep the wrong state. This can be easily replicated when using a multi-threaded (or simply multiple) producer(s), or when sending the events asynchronously.

Public Interfaces

There are no changes to the public interfaces. 

Proposed Changes

...

Special case where we need to retain LEO (log-end-offset) record / create an empty message batch for non-offset based compaction strategy: 

Today with the offset-only compaction strategy, the last record of the log (we call it the log-end-record, whose offset is log-end-offset) would always be preserved and not compacted. This is kinda important for replication since followers reason about the log-end-offset on the leader. Consider this case: three replicas of a partition, leader 1 and follower 2 and 3.

Leader 1 has records a, b, c, d and d is the current last record of the partition, the current log-end-offset is 3 (assuming record a's offset is 0).

Follower 2 has replicated a, b, c, d. Log-end-offset is 3 Follower 3 has replicated a, b, c but not yet replicated d. Log-end-offset is 2.

NOTE: that the compaction triggering are independent on brokers, it is possible that leader 1 triggers compaction and deletes record d, while other followers have not triggered compaction yet. At this moment the leader's log becomes a, b, c. Now let's say follower 3 fetch from leader after the compaction, it will no longer see record d.

Now suppose there's a leader migration and follower 3 becomes the new leader, it would accept new appends (say, it's e), and record e would be appended at *offset 3 *on new leader 3's log. But follower 2's offset 3's record is d still. Later let's say follower 2 also triggers compaction and also fetches the new record e from new leader 3:

Follower 2's log would be* a(0), b(1), c(2), e(4)* where the numbers in brackets are offset number; while leader 3's log would be *a(0), b(1), c(2), e(3)*. Now you see the two logs diverges in offsets, although their log entries are the same.

One way to resolve this, is to simply never remove the last message during compaction. Another way (suggested by Jason in the old VOTE thread) is to create an empty message batch to "take up" that offset slot.

Acknowledgement: we thank the previous author of this KIP proposal, Luís Cabral.

Public Interfaces

Adding below new configuration properties in both broker level and topic level configuration:

Broker Level:

  1. log.cleaner.compaction.strategy
    • Broker level it is optional and default it to offset
  2. log.cleaner.compaction.strategy.header

Topic Level:

  1. compaction.strategy
    • default to broker level compact strategy
  2. compaction.strategy.header

Proposed Changes

  • The current behavior should remain as the default in order to minimize impact on already existing clients and avoid any migration efforts;
  • New ConfigurationsConfiguration: 
      "
      1. log.cleaner.compaction.strategy
      "
        • The active compaction strategy to use;
        • Accepts values "offset", "timestamp" and "header", allowing for further strategies to be added in the future as needed;
      "
      1. log.cleaner.compaction.strategy.header
      "
        • Configuration sub-set to use when the strategy is set to "header";
  • Compaction Strategies:
    • "offset"
      • The previous current behavior is active, compacting the logs purely based on offset;
      • Also used when the configuration is either empty or not present, making this the default strategy;
    • "timestamp"
      • The record timestamp will be used to determine which record to keep, in a 'keep-highest' approach;
      • When both records being compared contain an equal timestamp, then the record with the highest offset will be kept;
      • This requires caching also the timestamp field during compaction, in addition to the base offset, so each record being compacted will suffer a memory increase from 8 bytes to 16 bytes when using this strategy.
    • "header"
      • Searches the record for a header key that matches the configured value on "log.cleaner.compaction.strategy.header";
          If the "compaction.strategy.header" configuration is not set (or is blank), then the compaction strategy will fallback to "offset"
          • as the header can have duplicates, will pick the last occurrence of the header key
        • If both records being compared do not have a matching header key, then the record with the highest offset will be kept;
        • If a header key that matches the configuration exists, then the header value (which must be of type "long" - 8 bytes) will be used to determine which record to keep, in a 'keep-highest' approach;
        • If both records being compared do not have a matching header key, then the record with the highest offset will be kept;If both records being compared contain an equal header value, then the record with the highest offset will be kept;
        • If only one of the records being compared has a matching header, then this record is kept, as the other record is considered to be anomalous;
        • This requires caching also the header field value during compaction, in addition to the base offset, so each record being compacted will suffer a memory increase from 8 bytes to 16 bytes when using this strategy.
    • Retain LEO (log-end-offset) record during compaction

    Compatibility, Deprecation, and Migration Plan

    Following the above proposed changes, there are no compatibility issues and no migration is required.

    ...

    . However to migrate existing topic to use header strategy, we are proposing below sequence to avoid inconsistency during migration:

    • Update producer to send the header value in all record.
    • Roll out the producer first to all clusters.
    • Once all producers sending the header value confirmed, update the topic config on the broker side with the header strategy.
    • Note:
      • any existing topic migration, the already compacted log still remains as such (i.e. offset based) and only the new logs will get compacted using the new strategy once the topic config updated with the strategy.
      • in any case if topic strategy needs roll back to default offset strategy, first topic config on the broker side should be updated to offset strategy and then producer can stop generating the header value.

    Recommendations

    • For scenarios like the low produce rate, the topic partition remaining ineligible for compaction for an unbounded duration where by "delete.retention.ms" triggers that removes the tombstone record if exist. In that case we recommend the Kafka users to have "segment.ms" & "max.compaction.lag.ms" (as compaction won't happen on active segment) to be smaller than the "delete.retention.ms".
    • As this KIP is introducing configurable compaction strategy, the Consumer should be aware and follow the same compact strategy as in broker to avoid inconsistency on what records to keep.

    Rejected Alternatives

          (This section remains the same as previous proposal.)

    • Producer to use Transaction with EOS to address Zombie Producer issue
      • Currently it is available only for the java clients (c++ support is in progress)
      • Adds additional overhead because of transaction to just address the compaction incorrect state issue that explained above
    • Stream the data out of Kafka and perform Event Sourcing there
      • This would mean creating an in-house solution, which makes Kafka irrelevant in the design, and so its best left as a last-approach in case no solution is found on Kafka-side
    • Guarantee insertion order on the producer
      • Not viable as keeping this logic synchronized greatly reduces the event throughput
    • Check the version before sending the event to Kafka
      • Similar to the previous point, though it adds extra complexity as race-conditions may arise when attempting to compare
    • Caching the record version as a byte array and perform the comparisons between records using a lexicographic byte array comparator
      • This adds greater flexibility on the client side, but allowing a variable byte array size to be used raises concerns about memory usage by the cache
    • Always search the headers for a key matching whatever is configured, so if a header "timestamp" exists then it could be used by the compaction mechanism
      • This introduces backwards compatibility issues, as any headers are allowed without this change and the compaction is not affected at all.
      • Even if ignoring the previous point, this may cause API issues as, for example, the topic may be designed with "offset" compaction, which makes it unclear if the Producer should then provide a header "offset" or if the internal offset is meant to be used.
    • Provide the configuration for the individual topics
      • None of the configurations for log compaction are available at topic level, so adding it there is not a part of this KIP