You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

 

Motivation

We have been discussing about several Kafka problems:

  1. Log cleaning dependence on the log rolling policy (KAFKA-979): today we clean up log data at the granularity of log segments, and only considering non-active log segments. As a result, if a log does not roll for a long time, the specified log cleaning policy may not be honored. This can cause unexpected amount of data duplicates when the consumer offsets are reset to "smallest".

  2. Log segment timestamp not persist during partition migration / broker restart (KAFKA-881 / KAFKA-1379): related to the previous issue (KAFKA-979), today the time-based log rolling mechanism depends on the creation time of the log segments, which will be changed when partition migrates or broker restarts, violating the log rolling policy. Requests about adding timestamps into the messages as well as the index files have also been proposed (KAFKA-1403).
     
  3. Mirror Maker de-compress / re-compress issue (KAFKA-1001): MM need to always do decompression at the consumer side and then re-compress the messages at the producer side in case there are keyed messages; this can lead to high CPU / memory usage and also risk of data loss due to too-large-message after re-compression.

  4. Broker de-compress / re-compress issue (KAFKA-527): broker needs to de-/re-compress messages just for assigning their offsets upon receiving compressed messages, leading to high CPU / memory usage.

  5. Auditing dependence on message value (KAFKA-260): some existing auditing tools (e.g. LinkedIn's auditing solution) requires looking into the message value content (i.e. timestamp, service / server names), causing the audit consumer to always de-compress / de-serialize the data; and what is even more important is that the Kafka team has to be aware of the message schemas, owning schema registry service, etc.

  6. Unclean election can yield data inconsistency (KAFKA-977): during unclean leader election (i.e. a non-ISR replica is elected as new leader), the log messages can diverge. And the fact that brokers would always truncate its data to high watermark while making themselves followers introduces a "window of weakness" that ISR is effectively just 1 and hence largely increase unclean election possibilities if consecutive broker failure happens (KAFKA-1211).

  7. Support control message (KAFKA-1639): we want to add "control message" into Kafka, which is real data but used by the broker / consumer for transactional messaging / Samza, etc.
     

These issues may be independently resolvable with separate solutions, but they actually come from the same root cause: some per-message metadata are either lacking (such as timestamps for log cleaning, wrapped offsets for avoiding de-/re-compression on broker / mirror maker, control messages) or being written as part of the message content, which requires Kafka to open the message content (including de-serialize / de-compress) thought it should not care about other values. Therefore, by enriching our current message metadata it is possible for us to kill them all in one stone. This page is made to inspire discussions about feasibility of this "one stone" approach.


Current Message Format

Enriching message metadata would be a wire protocol change. Fortunately that is affordable since we already add versions of message protocols in 0.8. The current request protocol can be found here. In short, a message is formatted as the following:

MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
 
  Message => Crc MagicByte Attributes KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes


The magic byte (int8) contains the version id of the message, currently set to 0.

The attribute byte (int8) holds metadata attributes about the message. The lowest 2 bits contain the compression codec used for the message. The other bits are currently set to 0.  

The key / value field can be omitted if the keylength / valuelength field is set to -1.

For compressed message, the offset field stores the last wrapped message's offset.


Proposed New Message Format

Here is a list of extensions we want to add to the message metadata:

  1. We would like to augment the message types to normal messages (compressed and uncompressed) and control messages (leader epoch, transactional, etc).
  2. We would like to add some "system tags" to the message metadata that are core to Kafka and only Kafka brokers care about them. Examples include:
    1. Timestamps upon reception (for any messages).
    2. ...
    3. Number of wrapped messages (for compressed messages)
    4. Relative message offsets honor-ship (for compressed messages, will talk about later)
    5. Metadata field (for controlled messages).
    6. ...
  3. We would also like to add some "application tags" to the message metadata that will not be read by Kafka (i.e. on the system level) but related to higher application usages (e.g., mirror maker, auditor, clients, etc). For example:
    1. Producer name (for auditor)
    2. Broker name (for auditor)
    3. Producer send time (for auditor)
    4. ...
    5. Boolean flag indicating if the compressed messages contain keys (for mirror maker).
    6. Raw message set byte size of the compressed message (for mirror maker)
    7. Metadata fields (for consumer clients).
    8. ...

 

Here is the proposed new message format:

MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64


  Message => Crc MagicByte Attributes KafkaTagLength [KafkaTag] AppsTagLength [AppTag] KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    SentTime => int32
    ReceiptTime => int32

    KafkaTagLength = > int32
    KafkaTag => 
      KafkaTagId => int8
      TagValue => [different types]

    AppTagLength = > int32
    AppTag => 
      AppTagKeyLength => int32
      AppTagKey => bytes
      AppTagValueLength => int32
      AppTagValue => bytes

    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes
 

Here is a summary of the changes:

  • MagicByte value is set to "1".

  • For compressed message, the offset field stores the starting offset (i.e. the offset of the first wrapped message).
    • The inner compressed messages' offset field will store the relative offset against the starting offset (i.e., 0, 1, 2 ...)
    • The offset of the inner message will then be the starting offset + relative offset.
    • With log compaction turned on, the relative offsets will be non consecutive.
    • When the compressed message is replicated to other clusters, the relative offsets need to be ignored and the offset of the inner message is then calculated as the starting offset + i (if it is the i-th wrapped message)

  • We will use the lowest 4 bits of the Attribute byte to indicate the type of the message:
    • normal - uncompressed
    • normal - gzip compressed
    • normal - snappy compressed 
    • ... 
    • control - leader epoch (from leader)
    • control - start transaction (from transaction coordinator)
    • control - commit transaction (from transaction coordinator)
    • control - abort transaction (from transaction coordinator)
    • ...

  • Kafka tags are identified by pre-defined IDs, and hence can be stored in a compact way. For example:
    • We can reserve the first 32 IDs for general messages, such as
      • kafka_timestamp => int64, this is the Unix timestamp set by the broker upon receipt, and hence can change while the message is going through the pipeline via MM. Used by log cleaning, etc.
    • Then the next 32 IDs for compressed messages, such as
      • num_messages => int32, this is for the compressed messages
    • etc..

    KafkaTagsLength specifies the total bytes of the tags field, which can be used iterate through the tags or skip the whole collection directly. Broker reading an unknown tag id will simply ignore the tag, and if there is a necessary tag that is not present it will use some default value / log exceptions. By doing this Kafka tag protocol change would not require a strict broker / client upgrade.

  • App tags are stored as a list collection of key-value pairs encoded as strings, and hence can be arbitrarily extended. It is used by higher-level applications (MM, auditor, clients, etc) read app-level metadata information without opening up the message content (key and value, which are possibly serialized / compressed). AppTagsLengh is also used as KafkaTagsLength to traverse / skip the tag lists. Examples of app tags are:

  • With the new format each message's metadata size increased by 8 bytes in the best case (Kafka/AppTagsLength = -1 indicate empty tags).

 

Using the New Message Metadata

Here is a brief description about how we are going to use the new metadata to solve the above mentioned issues.

 

Auditing Trail (KAFKA-260)

One way of doing audit is to have an auditing consumer which consumes all topics from a Kafka cluster and do the counting / processing necessary for auditing. The audit consumer will then just need to read the timestamps and producer service name and tier tag fields in order to put the messages into the right time bucket / service category; or we can also implement in-built auditors which co-locate with the brokers and do similar processing.

Broker Offset Reassignment (KAFKA-527)

When producer compressed the message, write the relative offset value in the raw message's offset field. Leave the wrapped message's offset blank.

When broker receives a compressed message, it only needs to set the wrapped message's offset and hence do not need to de-/re-compress message sets.

When the log cleaner is compacting log segments, when merging multiple wrapped messages into one it needs to update the raw message's relative offset values to mark "holes" inside the new wrapped message.

 

MirrorMaker Re-factoring (KAFKA-1001)

If non of the wrapped raw messages contains key, the wrapper compressed message can set "-1" in the keylength field indicating the case; otherwise set "-2" indicating "there are some messages wrapped that have keys".

If the message's raw message set size tag is included, the consumer could use it to pre-allocate memory if it does need to de-compress the message.

When MM's consumers gets the compressed message, it can decide whether or not to de-compress the message before putting it into the data channel. If it does not de-compress the message, reset the honor-ship flag of the relative message so that they will be treated as continuous offsets

The new producer's API needs to be augmented to send an already compressed message (and hence not adding another message metadata into it any more); also, it needs to alter the attributes of the wrapper message to defect dis-honor the relative message offsets.

 

Log Compaction / Log Cleaning (KAFKA-881, KAFKA-979)

Add the timestamp field into the index file, which will then look like <offset, time-stamp, physical position>.

The log compaction and log cleaning method can now be incorporated in the same background thread, who will do the following upon waken up (remember a topic-partition can either be compacted or cleaned, but not both):

  1. If compaction is used:
    1. If log is "dirty" enough, do compaction, in which it may merge multiple consecutive message sets into one => just need to remember the starting offset of the first message set and changing the relative offset values.
    2. Since this can leave "holes" in the compressed message, the compaction logic should also alter the attribute bits to honor the relative message offsets.
  2. Otherwise, try to do log cleaning:
    1. Loop over the segments from head to tail, checking the timestamp tag of the last message from the index file. If the timestamp is old enough, delete the whole segment.
    2. For the first segment whose last message's timestamp is still not old enough, do binary search based on index file's timestamp and do a head truncation on that segment file.

With this the log cleaning mechanism is no longer dependent on the log rolling policy.

 

Controlled Message / Transactions (KAFKA-1523, KAFKA-1639)

The control message will always be appended by follower fetchers, but can be filtered based on app logic (e.g. by the consumers).

 

Data Inconsistency from Unclean Election (KAFKA-977, KAFKA-1211)

Much of the details are discussed in KAFKA-1211, but the general design is:

  1. Both leader and followers maintain a <epoch, starting offset> map in their cache.
  2. Upon becoming the new leader, the broker will write a epoch message into its own log after it commits all the existing messages (but before it accepts the first new message), and update cache.
  3. Upon becoming the follower, the follower asked the leader for its epoch / starting offset map, update its own, and truncate its local logs ONLY to the highest offset of the known epoch.
  4. Upon fetched an epoch message from the follower fetcher thread, the follower broker update its own cache.

 

 

  • No labels