Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: thread1 thread2here

JIRA: KAFKA-4514

Released: 2.1.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

On September 2016, Facebook announced a new compression implementation named ZStandard, which is designed  designed to scale with modern data processing environment. With its great performance in both of Speed speed and Compression compression rate, Hadoop and HBase will support ZStandard in a close future.

I propose for Kafka to add support of Zstandard compression, along with new configuration options and binary log format update.

Before we go further, it would be better to see the benchmark result of Zstandard. I compared the compressed size and compression time of 3 1kb-sized messages (3102 bytes in total), with the Draft-implementation of ZStandard Compression Codec and all currently available CompressionCodecs. You can see the benchmark code from this commit. All elapsed times are the average of 1000 trials.

 

CodecLevelSizeTimeDescription
Gzip-396153 
Snappy-1,06337 
LZ4-38757 
Zstandard137456Speed-first setting.
237458 
337983Facebook's recommended default setting.
4379226 
5373102 
6373252 
7373667 
8373707 
9373830 
103731,029 
113731,973 
123731,985 
133732,352 
143732,324 
153741,668 
163744,996 
173712,418 
183717,434 
193689,997 
2036824,701 
2136890,044 
22368282,768Size-first setting.

 

As you can see above, ZStandard outplays all existing algorithms in both of compression rate and speed, especially working with the speed-first setting (level 1).

Public Interfaces

This feature requires modification on both of Configuration Options and Binary Log format.

Configuration

lots of popular big data processing frameworks are supporting ZStandard.

  • Hadoop (3.0.0) - 
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyHADOOP-13578
  • HBase (2.0.0) - 
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyHBASE-16710
  • Spark (2.3.0) - 
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keySPARK-19112

ZStandard also works well with Apache Kafka. Benchmarks with the draft version (with ZStandard 1.3.3, Java Binding 1.3.3-4) showed significant performance improvement. The following benchmark is based on Shopify's production environment (Thanks to @bobrik)

Image AddedImage Added

(Above: Drop around 22:00 is zstd level 1, then at 23:30 zstd level 6.)

As You can see, ZStandard outperforms with a compression ratio of 4.28x; Snappy is just 2.5x and Gzip is not even close in terms of both of ratio and speed.

It is worth noting that this outcome is based on ZStandard 1.3. According to Facebook, ZStandard 1.3.4 improves throughput by 20-30%, depending on compression level and underlying I/O performance.

Image Added

(Above: Comparison between ZStandard 1.3.3. vs. ZStandard 1.3.4.)

Image Added

(Above: Comparison between other compression codecs, supported by Kafka.)

As of September 2018, the draft implementation uses Java binding for ZStandard 1.3.5.


Public Interfaces

This feature introduces a new available option 'zstd' A new available option 'zstd' will be added to the compression.type property, which is used in configuring Producer, Topic and Broker.

Binary Log Format

It's id will be 4.

It also introduces a new error code, UNSUPPORTED_COMPRESSION_TYPE (74). For details on this error code, see 'Compatibility, Deprecation, and Migration Plan' section.The bit 2 of 1-byte "attributes" identifier in Message will be used to denote ZStandard compression; Currently, the first 3 bits (bit 0 ~ bit 2) of 1-byte attributes identifier is reserved for compression codec. Since only 4 compression codecs (NoCompression, GZipCompression, SnappyCompression and LZ4Compression) are currently supported, bit 2 has not been used until now. In other words, the adoption of ZStandard will introduce a new bit flag in the binary log format.

Proposed Changes

  1. Add a new dependency on the Java bindings of ZStandard compression.
  2. Add a new value on CompressionType enum type and define ZStdCompressionCodec on kafka.message package.
  3. Add a new error type, 'UNSUPPORTED_COMPRESSION_TYPE'.
  4. Implement the compression logic along with compatibility logic described below.

You can check the concept-proof implementation of this feature on this Pull Request.

Compatibility, Deprecation, and Migration Plan

None.

Rejected Alternatives

We need to establish some backward-compatibility strategy for the case an old client subscribes a topic using ZStandard, explicitly or implicitly (i.e., 'compression.type' configuration of given topic is 'producer' and the producer compressed the records with ZStandard). After discussion, we decided to support zstd to the new clients only (i.e., uses v2 format) and return UNSUPPORTED_COMPRESSION_TYPE error for the old clients.

Here is the detailed strategy:

  1. Zstd will only be allowed with magic = 2 format. That is,
    • Instantiating MemoryRecords with magic < 2 is disallowed.
    • Down-conversion of zstd-compressed records will not be supported. So if the requested partition uses 'producer' compression codec and the client requests magic < 2, the broker will down-convert the batch until before using zstd and return with a dummy oversized record in place of the zstd-compressed batch. When the client attempts to fetch from then on, it will receive a UNSUPPORTED_COMPRESSION_TYPE error.
  2. Bump produce and fetch request versions. It will give the old clients a message to update their version.
  3. Zstd will only be allowed for the bumped produce API. That is, for older version clients (=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE regardless of the message format.
  4. Zstd will only be allowed for the bumped fetch API. That is, if the requested partition uses zstd and the client version is below KAFKA_2_1_IV0,  we return UNSUPPORTED_COMPRESSION_TYPE regardless of the message format.

The following section explains why we chose this strategy.

Rejected Alternatives

A. Support ZStandard to the old clients which can understand v0, v1 messages only.

This strategy necessarily requires the down-conversion of v2 message compressed with Zstandard into v0 or v1 messages, which means a considerable performance degradation. So we rejected this strategy.

B. Bump the API version and support only v2-available clients

With this approach, we can message the old clients that they are old and should be upgraded. However, there are still several options for the Error code.

B.1. INVALID_REQUEST (42)

This option gives the client so little information; the user can be confused about why the client worked correctly in the past suddenly encounters a problem. So we rejected this strategy.

B.2. CORRUPT_MESSAGE (2)

This option gives inaccurate information; the user can be surprised and misunderstand that the log files are broken in some way. So we rejected this strategy.

B.3 UNSUPPORTED_FOR_MESSAGE_FORMAT (43)

The advantage of this approach is that we don't need to define a new error code; we can reuse it and that's all. The disadvantage of this approach is that it is also a little bit vague; This error code is defined as a work for KIP-98 and now returned in the transaction error.

Since adding a new error type is not a big problem and a clear error message always helps, we decided to reject this strategyNone yet.

Related issues

This update introduces some related issues on Kafkalike following.

Whether to use existing library or not

...

The draft implementation adopted the first approach, following its Snappy support. (In contrast, Hadoop follows the latter approach.) You can see the used JNI binding library at here. However, I thought it would be much better to discuss the alternatives, for I am a newbie to Kafka.

Whether to support dictionary feature or not

...

License

We can use zstd and its Java binding, zstd-jni without any problem, but we need to include their license to the project - BSD and BSD 2 Clause license, respectively. They are not listed in the list of prohibited licenses also.

What we need is attaching the licenses for the dependencies only. A recent update on Apache Spark shows how to approach this problem. They did: