Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table of Contents

Motivation

Here are a few main driving motivations for this proposal:

  1. Kafka should be agnostic to the message format: Joe Stein has proposed supporting general schema-based topics at Kafka, and at LinkedIn this idea has been applied for a while. We use Avro as our centralized schema management which controls the data format contract between clients, and in addition we have built our auditing system also based on the some pre-defined fields of the schemas (KAFKA-260). However, this approach forces Kafka to be aware of our Avro message formats for its auditing purposes beyond just piping bytes, which adds extra maintenance overhead (Todd has some more explanation in the discussion thread).
     
  2. Kafka should try to avoid any unnecessary de-compression / re-compression in its pipeline: today broker needs to de-/re-compress messages just for assigning their offsets upon receiving compressed messages (KAFKA-527); and MM need to always do decompression at the consumer side and then re-compress the messages at the producer side in case there are keyed messages (KAFKA-1001). This can lead to high CPU / memory usage and also risk of data loss due to too-large-message after re-compression in the middle of the pipeline. Both of these de- / re-compression process should be able to avoid in most cases.
     
  3. Kafka need to support control messages (KAFKA-1639): we want to add "control message" into Kafka, which is NOT real data but only used by the broker / clients for core Kafka functionality usage such as transactional messaging, etc.

 

We have been also discussing about several some other Kafka problems:

  1. Log cleaning dependence on the log rolling policy (KAFKA-979): today we clean up log data at the granularity of log segments, and only considering non-active log segments. As a result, if a log does not roll for a long time, the specified log cleaning policy may not be honored. This can cause unexpected amount of data duplicates when the consumer offsets are reset to "smallest".

  2. Log segment timestamp not persist during partition migration / broker restart (KAFKA-881 / KAFKA-1379): related to the previous issue (KAFKA-979), today the time-based log rolling mechanism depends on the creation time of the log segments, which will be changed when partition migrates or broker restarts, violating the log rolling policy. Requests about adding timestamps into the messages as well as the index files have also been proposed (KAFKA-1403).
     
    Mirror Maker de-compress / re-compress issue (KAFKA-1001): MM need to always do decompression at the consumer side and then re-compress the messages at the producer side in case there are keyed messages; this can lead to high CPU / memory usage and also risk of data loss due to too-large-message after re-compression.
    Broker de-compress / re-compress issue (KAFKA-527): broker needs to de-/re-compress messages just for assigning their offsets upon receiving compressed messages, leading to high CPU / memory usage.

  3. Auditing dependence on message value (KAFKA-260): some existing auditing tools (e.g. LinkedIn's auditing solution) requires looking into the message value content (i.e. timestamp, service / server names), causing the audit consumer to always de-compress / de-serialize the data; and what is even more important is that the Kafka team has to be aware of the message schemas, owning schema registry service, etc.
    Unclean election can yield data inconsistency (KAFKA-977): during unclean leader election (i.e. a non-ISR replica is elected as new leader), the log messages can diverge. And the fact that brokers would always truncate its data to high watermark while making themselves followers introduces a "window of weakness" that ISR is effectively just 1 and hence largely increase unclean election possibilities if consecutive broker failure happens (KAFKA-1211).
    Support control message (KAFKA-1639): we want to add "control message" into Kafka, which is real data but used by the broker / consumer for transactional messaging / Samza, etc.
     


These issues may be independently resolvable with separate solutions, but they actually come from the same root cause: some per-message metadata are either lacking (such as timestamps for log cleaning, wrapped offsets for avoiding de-/re-compression on broker / mirror maker, control messages) or being written as part of the message content, which requires Kafka to open the message content (including de-serialize / de-compress) thought it should not care about other values. Therefore, by enriching our current message metadata it is possible for us to kill them all in one stone. This page is made to inspire discussions about feasibility of this "one stone" approach.

...

Code Block
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64


  Message => Crc MagicByte Attributes KafkaTagLength [KafkaTag] AppsTagLengthAppTagLength [AppTag] KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    SentTime => int32
    ReceiptTime => int32

    KafkaTagLength = > int32
    KafkaTag => 
      KafkaTagId => int8
      TagValue => [different types]

    AppTagLength = > int32
    AppTag => 
      AppTagKeyLength => int32
      AppTagKey => bytes
      AppTagValueLength => int32
      AppTagValue => bytes

    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes
 

...

  • MagicByte value is set to "1".

  • For compressed message, the offset field stores the starting offset (i.e. the offset of the first wrapped message).
    • The inner compressed messages' offset field will store the relative offset against the starting offset (i.e., 0, 1, 2 ...)
    • The offset of the inner message will then be the starting offset + relative offset.
    • With log compaction turned on, the relative offsets will be non consecutive.
    • When the compressed message is replicated to other clusters, the relative offsets need to be ignored and the offset of the inner message is then calculated as the starting offset + i (if it is the i-th wrapped message)

  • We will use the lowest 4 bits of the Attribute byte to indicate the type of the message:
    • normal - uncompressed
    • normal - gzip compressed
    • normal - snappy compressed 
    • ... 
    • control - leader epoch (from leader)
    • control - start transaction (from transaction coordinator)
    • control - commit transaction (from transaction coordinator)
    • control - abort transaction (from transaction coordinator)
    • ...

  • Kafka tags are identified by pre-defined IDs, and hence can be stored in a compact way. For example:
    • We can reserve the first 32 IDs for general messages, such as
      • kafka_timestamp => int64, this is the Unix timestamp set by the broker upon receipt, and hence can change while the message is going through the pipeline via MM. Used by log cleaning, etc.
    • Then the next 32 IDs for compressed messages, such as
      • num_messages => int32, this is for the compressed messages
    • etc..

    KafkaTagsLength specifies the total bytes of the tags field, which can be used iterate through the tags or skip the whole collection directly. Broker reading an unknown tag id will simply ignore the tag, and if there is a necessary tag that is not present it will use some default value / log exceptions. By doing this Kafka tag protocol change would not require a strict broker / client upgrade.

  • App tags are stored as a list collection of key-value pairs encoded as strings, and hence can be arbitrarily extended. It is used by higher-level applications (MM, auditor, clients, etc) read app-level metadata information without opening up the message content (key and value, which are possibly serialized / compressed). AppTagsLengh is also used as KafkaTagsLength to traverse / skip the tag lists. Examples of app tags are:

  • With the new format each message's metadata size increased by 8 bytes in the best case (Kafka/AppTagsLength = -1 indicate empty tags).

...