You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 44 Next »

Status

Current state: Discussion

Discussion threadhttps://lists.apache.org/thread.html/79aa6e50d7c737ddf83455dd8063692a535a1afa558620fe1a1496d3@<dev.kafka.apache.org>

JIRAhttps://issues.apache.org/jira/browse/KAFKA-7061

PULL REQUEST: https://github.com/apache/kafka/pull/7528

Motivation

Current log compaction is based on the server side view i.e. compacted based on record offset and the offset is by the order when the record was received on the broker side. So for the same key, only the highest offset record is kept after compaction so that Kafka is able to reconstruct the current state of the events in a "most recent snapshot" approachThe issue then occurs when the insertion order is not guaranteed, which causes the log compaction to keep the wrong state. This can be easily replicated when using a multi-threaded (or simply multiple) producer(s), or when sending the events asynchronously. The following is an example:

Producer 1 tries to send a message <K1, V1> to topic A partition p1. Producer 2 tries to send a message <K1, V2> to the same (i.e. topic A partition p1). On the producer side, we clearly preserve an order for the two messages, <K1, V1> <K1, V2>. But on the server side, this order can be random, meaning, message <K1, V1> could have a higher offset due to the fact this message is received later than <K1, V2>. When compaction happens, <K1, V1> will be kept, and clearly this is not what is intended.

To resolve the above issue, we propose to add a feature to support compaction based on producer signal (i.e. adding 2 more compaction strategies, record timestamp and header sequence) and keeping the current compaction (i.e. offset based) as the default compaction for the backward compatibility. By this way, the producer will have an option to own and control the record ordering. As the log compaction is at the topic level and a broker can have multiple topics, keeping the compaction strategy configuration at topic level will be ideal. As the proposed configuration is at the topic level, the user can choose to enable a different compaction strategy for a subset of compact topics. While this proposal only supports two compaction strategies, it leaves the option open to add more compaction strategy in future. 

Acknowledgement: we thank the previous author of this KIP proposal, Luís Cabral.

Public Interfaces

There are no changes to the public interfaces. 

Proposed Changes

  • The current behavior should remain as the default in order to minimize impact on already existing clients and avoid any migration efforts;
  • New Configuration:
    • "log.cleaner.compaction.strategy"
      • The active compaction strategy to use;
      • Accepts values "offset", "timestamp" and "header.*", allowing for further strategies to be added in the future as needed;
        • For the header strategy, header key will be the suffix of the "header." and limited to the first level header lookup
  • Compaction Strategies:
    • "offset"
      • The current behavior is active, compacting the logs purely based on offset;
      • Also used when the configuration is either empty or not present, making this the default strategy;
    • "timestamp"
      • The record create timestamp will be used to determine which record to keep, in a 'keep-highest' approach;
      • When both records being compared contain an equal timestamp, then the record with the highest offset will be kept;
      • This requires caching also the timestamp field during compaction, in addition to the base offset, so each record being compacted will suffer a memory increase from 8 bytes to 16 bytes when using this strategy.
    • "header"
      • Searches the record for a header key that matches the configured suffix value on header strategy "header.*";
      • If both records being compared do not have a matching header key, then the record with the highest offset will be kept;
      • If a header key that matches the configuration exists, then the header value (which must be of type "long" - 8 bytes) will be used to determine which record to keep, in a 'keep-highest' approach;
      • If both records being compared contain an equal header value, then the record with the highest offset will be kept;
      • If only one of the records being compared has a matching header, then this record is kept, as the other record is considered to be anomalous;
      • This requires caching also the header value during compaction, in addition to the base offset, so each record being compacted will suffer a memory increase from 8 bytes to 16 bytes when using this strategy.

Compatibility, Deprecation, and Migration Plan

Following the proposed changes, there are no compatibility issues however to migrate existing topic to use header strategy, should follow the below sequence:

  • Update producer to send the header value in all record.
  • Rollout the producer first to all clusters.
  • Once all producers sending the header value, update the topic config on the broker side with the header strategy.
  • Note:
    • any existing topic migration, the already compacted log still remains as such (i.e. offset based) and only the new logs will get compacted using the new strategy once the topic config updated with the strategy.
    • in any case if topic strategy needs roll back to default offset strategy, first topic config on the broker side should be updated to offset strategy and then producer can stop generating the header value. However the existing compacted log based on header value wont get changed.

Rejected Alternatives

      (This section remains the same as previous proposal.)

  • Producer to use Transaction with EOS to address Zombie Producer issue
    • Currently it is available only for the java clients and c++ is in progress
    • Adds additional overhead because of transaction to just address the compaction incorrect state issue that explained above
  • Stream the data out of Kafka and perform Event Sourcing there
    • This would mean creating an in-house solution, which makes Kafka irrelevant in the design, and so its best left as a last-approach in case no solution is found on Kafka-side
  • Guarantee insertion order on the producer
    • Not viable as keeping this logic synchronized greatly reduces the event throughput
  • Check the version before sending the event to Kafka
    • Similar to the previous point, though it adds extra complexity as race-conditions may arise when attempting to compare
  • Caching the record version as a byte array and perform the comparisons between records using a lexicographic byte array comparator
    • This adds greater flexibility on the client side, but allowing a variable byte array size to be used raises concerns about memory usage by the cache
  • Always search the headers for a key matching whatever is configured, so if a header "timestamp" exists then it could be used by the compaction mechanism
    • This introduces backwards compatibility issues, as any headers are allowed without this change and the compaction is not affected at all.
    • Even if ignoring the previous point, this may cause API issues as, for example, the topic may be designed with "offset" compaction, which makes it unclear if the Producer should then provide a header "offset" or if the internal offset is meant to be used.
  • Provide the configuration for the individual topics
    • None of the configurations for log compaction are available at topic level, so adding it there is not a part of this KIP
  • No labels