You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: KAFKA-4514

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

On September 2016, Facebook announced a new compression implementation named ZStandard, designed to scale with modern data processing environment. With its great performance in both speed and compression rate, lots of popular big data processing frameworks are supporting ZStandard.

  • Hadoop (3.0.0) -  Unable to render Jira issues macro, execution error.
  • HBase (2.0.0) -  Unable to render Jira issues macro, execution error.
  • Spark (2.3.0) -  Unable to render Jira issues macro, execution error.

ZStandard also works well with Apache Kafka. Benchmarks with the draft version (with ZStandard 1.3.3, Java Binding 1.3.3-4) showed significant performance improvement. The following benchmark is based on Shopify's production environment (Thanks to @bobrik)

(Above: Drop around 22:00 is zstd level 1, then at 23:30 zstd level 6.)

As You can see, ZStandard outperforms with a compression ratio of 4.28x; Snappy is just 2.5x and Gzip is not even close in terms of both of ratio and speed.

It is worth noting that this outcome is based on ZStandard 1.3. According to Facebook, ZStandard 1.3.4 improves throughput by 20-30%, depending on compression level and underlying I/O performance.

(Above: Comparison between ZStandard 1.3.3. vs. ZStandard 1.3.4.)

(Above: Comparison between other compression codecs, supported by Kafka.)

As of May 2018, Java binding for ZStandard 1.3.4 is still in progress; it will be updated before merging if this proposal is approved.

Accompanying Issues

However, supporting ZStandard is not just adding new compression codec; It introduces several issues related to it. We need to address those issues first:

Backward Compatibility

Since the producer chooses the compression codec by default, there are potential problems:

A. An old consumer that does not support ZStandard receives ZStandard compressed data from the brokers.
B. Old brokers that don't support ZStandard receives ZStandard compressed data from the producer.

To address the problems above, we have following options:

a. Bump the produce and fetch protocol versions in ApiKeys.

Advantages:

  • Can guide the users to upgrade their client.
  • Can support advanced features.
  • Broker Transcoding: Currently, the broker throws UNKNOWN_SERVER_ERROR for unknown compression codec with this feature.
  • Per Topic Configuration - we can force the clients to use predefined compression codecs only by configuring available codecs for each topic. This feature should be handled in separate KIP, but this approach can be a preparation.

Disadvantages:

  • The older brokers can't make use of ZStandard.
  • Short of a bump to the message format version.

b. Leave unchanged - let the old clients fail.

Previously added codecs, Snappy (commit c51b940) and LZ4 (commit 547cced), follow this approach. With this approach, the problems listed above ends with following error message:

java.lang.IllegalArgumentException: Unknown compression type id: 4

Advantages:

  • Easy to work: we need nothing special.

Disadvantages:

  • The error message is a little bit short. Some users with old clients may be confused how to cope with this error.

c. Improve the error messages

This approach is a compromise of a and b. We can provide supported api version for each compression codec within the error message by defining a mapping between CompressionCodec and ApiKeys:

NoCompressionCodec => ApiKeys.OFFSET_FETCH        // 0.7.0
GZIPCompressionCodec => ApiKeys.OFFSET_FETCH      // 0.7.0
SnappyCompressionCodec => ApiKeys.OFFSET_FETCH    // 0.7.0
LZ4CompressionCodec => ApiKeys.OFFSET_FETCH       // 0.7.0
ZStdCompressionCodec => ApiKeys.DELETE_GROUPS     // 2.0.0

Advantages:

  • Not so much work to do.
  • Can guide the users to upgrade their client.
  • Spare some room for advances features in the future, like Per Topic Configuration.

Disadvantages:

  • The error message may still short.

Support Dictionary

Another issue worth bringing into play is the dictionary feature. ZStandard offers a training mode, which yields dictionary for compressing and decompression. It dramatically improves the compression ratio of small and repetitive input (e.g., semi-structured json), which perfectly fits into Kafka's case. (For real-world benchmark, see here) Although the details of how to adapt this feature into Kafka (example) should be discussed in the separate KIP, We need to leave room behind.

Public Interfaces

This feature requires modification on both of Configuration Options and Binary Log format.

Configuration

A new available option 'zstd' will be added to the compression.type property, which is used in configuring Producer, Topic and Broker.

Binary Log Format

The bit 2 of 1-byte "attributes" identifier in Message will be used to denote ZStandard compression; Currently, the first 3 bits (bit 0 ~ bit 2) of 1-byte attributes identifier is reserved for compression codec. Since only 4 compression codecs (NoCompression, GZipCompression, SnappyCompression and LZ4Compression) are currently supported, bit 2 has not been used until now. In other words, the adoption of ZStandard will introduce a new bit flag in the binary log format.

Proposed Changes

  1. Add a new dependency on the Java bindings of ZStandard compression.
  2. Add a new value on CompressionType enum type and define ZStdCompressionCodec on kafka.message package.
  3. Add appropriate routine for the backward compatibility problem discussed above.

You can check the concept-proof implementation of this feature on this Pull Request.

Compatibility, Deprecation, and Migration Plan

It is entirely up to the community's decision for the backward compatibility problem.

Rejected Alternatives

None yet.

Related issues

This update introduces some related issues on Kafka.

Whether to use existing library or not

There are two ways of adapting ZStandard to Kafka, each of which has its pros and cons.

  1. Use existing bindings.
    • Pros
      • Fast to work.
      • The building task doesn't need ZStandard to be pre-installed to the environment.
    • Cons
      • Somebody has to keep the eyeballs on the updates of both of the binding library and ZStandard itself. If needed, he or she has to update the binding library to adapt them to Kafka.
  2. Add existing JNI bindings directly.
    • Pros
      • Can concentrate on the updates of ZStandard only.
    • Cons
      • ZStandard has to be pre-installed before building Kafka.
      • A little bit cumbersome to work.

The draft implementation adopted the first approach, following its Snappy support. (In contrast, Hadoop follows the latter approach.) You can see the used JNI binding library at here. However, I thought it would be much better to discuss the alternatives, for I am a newbie to Kafka.

Whether to support dictionary feature or not

ZStandard supports dictionary feature, which enables boosting efficiency by sharing learned dictionary. Since each of Kafka log message has repeated patterns, supporting this feature can improve the efficiency one more step further. However, this feature requires a new configurable option to point the location of the dictionary.

  • No labels