Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

 

Table of Contents

Motivation

Here are a few main driving motivations for this proposal:

  1. Kafka should try to avoid any unnecessary de-compression / re-compression in its pipeline: today broker needs to de-/re-compress messages just for assigning their offsets upon receiving compressed messages (KAFKA-527); and MM need to always do decompression at the consumer side and then re-compress the messages at the producer side in case there are keyed messages (KAFKA-1001). This can lead to high CPU / memory usage and also risk of data loss due to too-large-message after re-compression in the middle of the pipeline. Both of these de- / re-compression process should be able to avoid in most cases.
     
  2. Kafka should support auditing while preserving its agnosticity to the message format: Joe Stein has proposed supporting general schema-based topics at Kafka, and at LinkedIn this idea has been applied for a while. We use Avro as our centralized schema management which controls the data format contract between clients, and in addition we have built our auditing system also based on the some pre-defined fields of the schemas (KAFKA-260). However, this approach forces Kafka to be aware of our Avro message formats for its auditing purposes beyond just piping bytes, which adds extra maintenance overhead (Todd has some more explanation in the discussion thread).
     
  3. Kafka need to support control messages (KAFKA-1639): we want to add "control message" into Kafka, which is NOT real data but only used by the broker / clients for core Kafka functionality usage such as transactional messaging, etc.

 

 

Update: the scope of this proposal is narrowed to the kafka core properties with the focus on compression / log compaction only now. We leave other issues such as auditing that may involve application properties for future discussion.

 

Motivation

We have been discussing about several We have been also discussing about some other Kafka problems:

  1. Log cleaning dependence on the log rolling policy (KAFKA-979): today we clean up log data at the granularity of log segments, and only considering non-active log segments. As a result, if a log does not roll for a long time, the specified log cleaning policy may not be honored. This can cause unexpected amount of data duplicates when the consumer offsets are reset to "smallest".

  2. Log segment timestamp not persist during partition migration / broker restart (KAFKA-881 / KAFKA-1379): related to the previous issue (KAFKA-979), today the time-based log rolling mechanism depends on the creation time of the log segments, which will be changed when partition migrates or broker restarts, violating the log rolling policy. Requests about adding timestamps into the messages as well as the index files have also been proposed (KAFKA-1403).
     Unclean election can yield data inconsistency
  3. Mirror Maker de-compress / re-compress issue (KAFKA-977): during unclean leader election (i.e. a non-ISR replica is elected as new leader), the log messages can diverge. And the fact that brokers would always truncate its data to high watermark while making themselves followers introduces a "window of weakness" that ISR is effectively just 1 and hence largely increase unclean election possibilities if consecutive broker failure happens (KAFKA-1211).1001): MM need to always do decompression at the consumer side and then re-compress the messages at the producer side in case there are keyed messages; this can lead to high CPU / memory usage and also risk of data loss due to too-large-message after re-compression.

  4. Broker de-compress / re-compress issue (KAFKA-527): broker needs to de-/re-compress messages just for assigning their offsets upon receiving compressed messages, leading to high CPU / memory usage.
     

These issues may be independently resolvable with separate solutions, but they actually come from the same root cause: some per-message metadata are either lacking (such as timestamps for log cleaning, wrapped offsets for avoiding de-/re-compression on broker / mirror maker, control messages) or being written as part of the message content, which requires Kafka to open the message content (including de-serialize / de-compress) thought it should not care about other values. Therefore, by enriching our current message metadata it is possible for us to kill them all in one stone. This page is made to inspire discussions about feasibility of this "one stone" approach.

...

Code Block
MessageAndOffset => MessageSize Offset MessageSize Message
  OffsetMessageSize => int64int32
  MessageSizeOffset => int32int64
 
  Message => Crc MagicByte Attributes KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

...

Proposed New Message Format

Here is a list of extensions we want to add to the message metadata:

  1. We would like to augment the message types to normal messages (compressed and uncompressed) and control messages (leader epoch, transactional, etc)We would like to add the "enqueue" timestamp that is set by the broker upon receiving the message as the first class of the message header.
  2. We would like to add some "system tagsKafka properties" to the message metadata that are core to Kafka and only Kafka that brokers care about them. Examples include:
  • Timestamps upon reception (for any messages)

...

  • .
  • Number of wrapped messages (for compressed messages).

...

  • Wrapped message set relative offsets honor-ship (for compressed

...

  • messages).

...

  • Indicator whether the wrapped message set contain keys (for compressed messages).
  • ...

 

Here is the proposed new message format:

Code Block
MessageAndOffset => MessageSize Offset MessageSize Message
  OffsetMessageSize => int64int32
  MessageSizeOffset => int32int64


  Message => Crc MagicByte Attributes Timestamp KafkaTagLength [KafkaTag] AppTagLength [AppTag] KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    SentTimeTimestamp => int32
    ReceiptTime => int32

    KafkaTagLength = > int32
    KafkaTag => 
      KafkaTagId => int8
      TagValue => [different types]

    AppTagLength = > int32
    AppTag => 
      AppTagKeyLength => int32
      AppTagKey => bytes
      AppTagValueLength => int32
      AppTagValue => bytes

    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes
 

...

  • MagicByte value is set to "1".

  • For compressed message, the offset field stores the starting offset (i.e. the offset of the first wrapped message).
    • The inner compressed messages' offset field will store the relative offset against the starting offset (i.e., 0, 1, 2 ...)
    • The offset of the inner message will then be the starting offset + relative offset.
    • With log compaction turned on, the relative offsets will be non consecutive.
    • When the compressed message is replicated to other clusters, the relative offsets need to be ignored and the offset of the inner message is then calculated as the starting offset + i (if it is the i-th wrapped message)

  • We will use the lowest 4 bits of the Attribute byte to indicate the type of the message:
    • normal - uncompressed
    • normal - gzip compressed
    • normal - snappy compressed 
    • ...
    • ... (below are future possible types)
    • ... 
    • control - leader epoch (from leader)
    • control - start transaction (from transaction coordinator)
    • control - commit transaction (from transaction coordinator)
    • control - abort transaction (from transaction coordinator)
    • ...

  • Kafka tags are identified by pre-defined IDs, and hence can be stored in a compact way. For example:
    • We can reserve the first 32 IDs for general messages, such as
      kafka_0: timestamp => int64, this is the Unix timestamp set by the broker upon receipt, and hence can change while the message is going through the pipeline via MM. Used by log cleaning, etc.
    • Then the next 32 IDs for compressed messages, such as
      32: num_messages => int32, number of wrapped messages in the message set; this is only used for the compressed messages.
    • etc...

  • KafkaTagsLength specifies the total bytes of the tags field, which can be used iterate through the tags or skip the whole collection directly.

  • Broker reading an unknown tag id will simply ignore the tag, and if there is a necessary tag that is not present it will use some default value / log exceptions. By doing this Kafka tag protocol change would not require a strict broker / client upgrade.App tags are stored as a list collection of key-value pairs encoded as strings, and hence can be arbitrarily extended. It is used by higher-level applications (MM, auditor, clients, etc) read app-level metadata information without opening up the message content (key and value, which are possibly serialized / compressed). AppTagsLengh is also used as KafkaTagsLength to traverse / skip the tag lists.

 

With the new format each message's metadata size increased by 8 bytes in the best case (

...

KafkaTagsLength = -1 indicate empty tags).

 

Using the New Message Metadata

Here is a brief description about how we are going to use the new metadata to solve the above mentioned issues.

 

...

One way of doing audit is to have an auditing consumer which consumes all topics from a Kafka cluster and do the counting / processing necessary for auditing. The audit consumer will then just need to read the timestamps and producer service name and tier tag fields in order to put the messages into the right time bucket / service category; or we can also implement in-built auditors which co-locate with the brokers and do similar processing.

Broker Offset Reassignment (KAFKA-527)

...

When the log cleaner is compacting log segments, when merging multiple wrapped messages into one it needs to update the raw message's relative offset values to mark (note this will leave "holes" inside the new wrapped message).

 

MirrorMaker Re-factoring (KAFKA-1001)

If non of the wrapped raw messages contains key, the wrapper compressed message producer can set "-1" in the keylength field indicating the case; otherwise set "-2" indicating "there are some messages wrapped that have keys".

If the message's raw message set size tag is included, the consumer could use it to pre-allocate memory if it does need to de-compress the message.

the non-keyed indicator of the compressed message to true; otherwise set to false.

When MM's consumers gets the compressed message, if the non-keyed indicator is set it does not need to de-compress it, otherwise it needs to compress it.

When MM's consumers gets the compressed message, it can decide whether or not to de-compress the message before putting it into the data channel. If it does not de-compress the message, reset the honor-ship flag of the relative message so that they will be treated as continuous offsets.

The new producer's API needs to be augmented to send an already compressed message (and hence not adding another message metadata into it any more); also.

When consumers decompress message set, it needs to alter the attributes of the wrapper message to defect dis-honor the relative message offsetswill return the message with its offset either by the "message set starting offset" + "relative offset" if the honor-ship flag is set; or "message set starting offset" + "index of the message in set" otherwise.

 

Log Compaction / Log Cleaning (KAFKA-881, KAFKA-979)

Add the timestamp field into the index file, which will then look like <offset, time-stamp, physical position>.

The log compaction and log cleaning method can now be incorporated in the same background thread, who will do the following upon waken up (remember a topic-partition can either be compacted or cleaned, but not both):

  1. If compaction is used:
    1. If log is "dirty" enough, do compaction, in which it may merge Merge multiple consecutive message sets into one => just need to remember the starting offset of the first message set and changing the relative offset values.
    2. Since this can leave "holes" in the compressed message, the compaction logic should also alter the attribute bits to honor the relative message offsets.
    3. If the relative offset honor-ship is not set do not use the relative offset but just use the index of the message in the set.
  2. If log cleaning is usedOtherwise, try to do log cleaning:
    1. Loop over the segments from head to tail, checking the timestamp tag of the last message from the index file. If the timestamp is old enough, delete the whole segment.
    2. For the first segment whose last message's timestamp is still not old enough, do binary search based on index file's timestamp and do a head truncation on that segment file.

With this the log cleaning mechanism is no longer dependent on the log rolling policy.

 

Controlled Message / Transactions (KAFKA-1523, KAFKA-1639)

The control message will always be appended by follower fetchers, but can be filtered based on app logic (e.g. by the consumers).

 

Data Inconsistency from Unclean Election (KAFKA-977, KAFKA-1211)

Much of the details are discussed in KAFKA-1211, but the general design is:

...

.

...