You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 16 Next »

Status

Current state: Under Discussion

Discussion threadhttps://lists.apache.org/thread.html/79aa6e50d7c737ddf83455dd8063692a535a1afa558620fe1a1496d3@<dev.kafka.apache.org>

JIRA: ---

PULL REQUEST: https://github.com/apache/kafka/pull/4822

Motivation

In order to use Kafka as the message broker within an Event Source architecture, it becomes essential that Kafka is able to reconstruct the current state of the events in a "most recent snapshot" approach.

This is where log compaction becomes an integral part of the workflow, as only the latest state is of interest. At the moment, Kafka accomplishes this by considering the insertion order (or highest offset) as a representation of the latest state.

The issue then occurs when the insertion order is not guaranteed, which causes the log compaction to keep the wrong state. This can be easily replicated when using a multi-threaded (or simply multiple) producer(s), or when sending the events asynchronously.

Public Interfaces

There are no changes to the public interfaces. 

Proposed Changes

  • Enhance log compaction to support more than just offset comparison, so the insertion order isn't always dictating which records to keep (in effect, allowing for a form of OCC);
  • The current behavior should remain as the default in order to minimize impact on already existing clients and avoid any migration efforts;
  • Add new Kafka configuration "log.cleaner.compaction.strategy" to toggle the compaction strategy to this approach;
  • Add new Topic configuration "compaction.strategy" representing the same as above, with reserved values "offset" and "timestamp";
  • The default value of these configurations should be "offset", which represents the previous behavior;
  • When this configuration is set to "timestamp", then the record timestamp will be used to determine which record to keep, in a 'keep-highest' approach;
  • When this configuration is set to anything other than "offset" or "timestamp", then the record headers are scanned for a key matching this value. If this header is found then this value will be used to determine which record to keep, in a 'keep-highest' approach;
  • As the header value exists as a byte array, it is up to the client to ensure that the represented bytes can be be compared in a lexicographic order.
  • When both records being compared contain a matching "compaction value", then the record with the highest offset will be kept;
  • When both records being compared do not have a "compaction value" at all, then the record with the highest offset will be kept;

  • When only one of the records being compared has a "compaction value", then this record is kept (as the other is considered to be anomalous);

Details of the change can be viewed from the pull request.

FAQ

Q) Why use a long to represent the compaction value?

A) By using this format, we can better have a comparable that can be used by the client for both a timestamp version approach or an incremental numeral version approach.

Q) How does the compaction value get parsed from byte[] to long?

A) This is accomplished by converting the byte[] directly to long via the ByteUtils utility class. If any of these steps fails, then the compaction value is ignored (ie, its considered the record doesn't have a versioned compaction value).

Q) Why not compare the raw byte[] directly instead of performing these conversions?

A) Although the byte[] can be compared, it is not actually comparable. In order to compare these, some conversion operations would have to be done that, in the end, would not make it more efficient than directly converting to long.

Compatibility, Deprecation, and Migration Plan

Following the proposed changes, there are no compatibility issues and no migration is required.

Rejected Alternatives

  • Stream the data out of Kafka and perform Event Sourcing there
    • This would mean creating an in-house solution, which makes Kafka irrelevant in the design, and so its best left as a last-approach in case no solution is found on Kafka-side
  • Guarantee insertion order on the producer
    • Not viable as keeping this logic synchronized greatly reduces the event throughput
  • Check the version before sending the event to Kafka
    • Similar to the previous point, though it adds extra complexity as race-conditions may arise when attempting to compare
  • No labels