This proposal covers more fully integrating the idea of keyed messages into Kafka and better handling message streams that have the idea of updates.

API and Message Changes

Currently the producer can partition messages by a key but the key is not retained and is not available to the downstream consumer. This proposal would remove the key from the producer and instead add the following fields to

Message:

Message.key()
Message.hasKey()

This would change the binary format of message and as such would change the protocol to the client.

The producer java API would remain the same:

 producer.send(new ProducerData("topic", keyObj, valueObj))

However now we would require a key serializer class in addition to the value serializer to be able to read and write the key since we now no longer just get the hash code of the object but physically store the serialized bytes.

Offset Change

We would also change the offset in the following way:

  1. Make the offset a sequential integer (0, 1, 2, 3...) for each partition.
  2. Store the offset in the log

The purpose of this change is to decouple the offset from the file offset. Coupling these two things makes the search strategy to lookup a message very simple and light-weight, but makes it impossible to change the file position of a message. This change requires the ability to change the file layout while maintaining a permanent message id.

The message sets provided by the producer would have arbitrary offsets set, and the offsets would be assigned by the server.

The consumer would always request last_offset+1 as its new offset rather than using the byte size to calculate the next position. This is arguably simpler and less error prone for the consumer.

This change requires a change to the search strategy to service a fetch request so that the logical offset can be translated to a physical file position.

To accommodate this I would introduce a simple index of offset=>physical location. The format of this index would be a simple sorted file that would accompany each log segment, so a log segment would now have two files 0.log and 0.offsets. The later file would contain entries in the form

 logical_offset1 physical_offset1 logical_offset2 physical_offset2 ...

The logical offset would always be a 4 byte integer taken relative to the base offset of the file stored in the file name; the physical offset would be 4 byte physical offset in the file of that message. A search for a particular message offset would proceed as follows:

  1. Find the log segment with the greatest offset <= the desired offset
  2. Perform a binary search on a memory mapping of the index file to locate the file offset of the greatest offset <= the given offset
  3. Seek to that location and read messages until we find the first message >= the given message and begin reading the consumers message set from there

Note that this allows the index to either contain an entry for every message or only some messages (say every 4k).

A sorted file is not a very good index structure in terms of minimizing look-up time (a positionally-indexed btree or other structures have better caching characteristics), but this format has the advantage of only requiring simple appends and interacts well with the keyed log retention described below.

We originally were concerned that a more complex search algorithm would have too large an impact on performance, but I don't think this will be true. Consumers that are mostly caught up will do many small requests since there may be only a few new messages since their last fetch, but for these consumers the index will likely be fully in RAM and lookups will be very fast. Consumers that start from the beginning of the log may well be doing lookups on a cold index, but in this case the consumer reads MANY message in a chunk so the additional expense of the index lookup is amortized. Because we have fantastic locality even though a consumer starting on an old segment will have to warm the index on its first few requests, that index will likely remain warm as it consumes the remainder of that segment. 

One can work this out in a back-of-the-envelope way: Assuming 512MB segment size and index entries every 4k of messages this would yield 1MB "hot" index per partition. Assuming 1000 partitions on a server, this gives ~1GB of recent index. Smaller segment sizes would reduce this (at the cost of more files).

With 128K readahead (linux default) you would expect the 1MB index to be fully pages into memory after 8 seeks, which should give a reasonable bound on performance. Previous experiments for this kind of index showed that you could easily do 100k qps on a memory-resident index.

Log Compaction

The purpose of all these changes is to enable a more intelligent retention strategy for keyed data. Currently for "event" type data each message represents a new entity and the data consequently grows without bound. To support this we truncate old log segments based on various criteria, which is essentially the only option for this type of data if one has bounded storage space. However data streams that have the semantics of updating keyed entities allow a more desirable strategy, namely deleting previous values of each key. This is preferable to truncating old segments because the log still contains the final value of all keys and hence can act as a complete "backup", allowing consumers to fully bootstrap themselves.

Note that the semantics are very loose here, de-duplication is being done only to save space and prevent readers from reading too many versions of the same record. We can exploit this looseness to make the deduplication more efficient.

Here is the proposal for implementing this retention strategy. We would add a per-topic setting specifying the retention strategy (size, time, or key). Size and time would be handled as they are today. First a simplified version of deduplicaton:

Divide the log into two segments: a head which consists of segments that have not yet been processed and a tail which consists of segments that have been previously deduplicated.

We will use a a Map<byte[], Long> S containing keys in the head of the log and the offset of their latest update. First I will describe the use of the mapping S, then a way to implement it efficiently. First we scan the head of the log from oldest to newest populating S with the keys in the log; when this scan is complete S contains the latest offset of each key in the head of the log.

Now we recopy all the log segments and their indexes. For each entry we first check S to see if it contains the key, if so we do not recopy the message unless its offset is >= the offset stored in S.

We can make the implementation of S efficient by exploiting the fact that we need not remove all duplicates (just most of them). To do this we implement S as a two parallel arrays, byte[size*16] and long[size]. The byte array stores the md5 by using the first 4 bytes of the md5 as an integer position p, and stores the md5 beginning at position p*16. In the event of hash collisions there will be a key which is missed in deduplication, however this is okay, and by changing the seed for the hashing on each compaction we can ensure that the key will likely be collected on subsequent collections.

The compacter would statically allocate these large arrays to some configurable size, and loop through the partitions compacting a given partition whenever the head of the log reaches a certain volume of uncompacted data.

Generational Compaction

Note that this approach is likely good enough for many uses cases where the update rate is not too high, however it could be improved significantly. The key realization is that the distribution of updates is unlikely to be purely uniform. Assuming non-uniform update rates, and assuming the probability of update is somewhat stable, if we ran this compaction strategy for many iterations the messages early in the log would be less likely to ever be collected. The intuition comes from the fact that a message early in the log has survived many compactions, and never been replaced by a later message.

This implies that later segments are likely to have a greater yield of garbage than older segments and should be collected more frequently. This heuristic is what is exploited in generational garbage collectors, as well as the generational compaction in Cassandra and LevelDB. The problem here is simpler though because we retain data in log order. As a result I believe it may be possible to derive an algorithm that collects statistics on the yield gained by cleaning segments (i.e. the amount of garbage found) in order to chose an approximately optimal cleaning schedule for each segment.

There is no reason this couldn't be done in a later phase.

Implications for Replication

I believe this is largely orthoganol to replication, but would like to have that sanity checked. Retention would run independently on each server with no attempt to co-ordinate timing. This would mean that logs with the keyed compaction strategy are not guaranteed to be byte-for-byte identical (technically this is true for other retention strategies too due to timing differences in when deletion happens, but this is much more extreme). The guarantee for a keyed log would be that messages are delivered in the order they are published (for each partition) and that a consumer eventually sees the latest message for each key (messages made obsolete might or might not be delivered depending on the timing). In practice since compaction would only happen with granularity of at least one segment, a moderately up-to-date consumer would see all messages sent.

Auditing

The current auditing approach would not be appropriate. Instead for these topics, auditing should measure the delivered offset of each key. I believe this could be implemented as a separate phase since auditing is not yet committed anywhere.

Impact On Existing Use Cases

This is a fairly invasive change from the point-of-view of compatability and would be a fairly substantial change to the log implementation. However I think it is mostly a positive change: the switch to logical message offsets is aesthetically nice, and would simplify the consumer implementation.

My primary concerns would be (1) not introducing any new bugs, and (2) getting acceptable performance out of the index implementation.

Implementation Phases

  1. Physical changes to message and log to retain key and offset. This should ideally go out with 0.8 since it will be a compatibility change.
  2. Change to logical offsets.
  3. -Implement simple "brute force" log compaction for keyed messages
  4. Nice-to-have
    1. Implement a generational compaction scheme to lessen the I/O in compaction. This is a nice to have and could be done much later.
    2. Key-based auditing
  • No labels

2 Comments

  1. I am not sure I see the reason to perform a binary search, physical offsets in Kafka are fixed size so a fixed size index should suffice (searches should thus be O(1) not O(log n)).

    We implemented something similar at Tagged for indexed accesses to messages in a very simple way using the existing io facilities provided by Kafka:

    1) for each message put into Topic A, the physical offset of Topic A was written into Topic A'
    2) It's thus possible to read any arbitrary logical offset m which exists in Topic A' since the entries in Topic A' are fixed sized so simple math produces the physical offset in Topic A', the value of which contains the physical offset in Topic A for logical offset m.

    1. I think I see the reasoning for the storage of arbitrary offsets and searching rather than direct indexing - it allows for a "sparse" log format that doesn't force storage of all logical offsets in a single offsets file.

      Would it be possible to use sparse files at the OS level?

      It seems Kafka is turning into a database... (wink)