Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: thread1 thread2here
JIRA: KAFKA-4514
Released: 2.1.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
On September 2016, Facebook announced a new compression implementation named ZStandard, which is designed designed to scale with modern data processing environment. With its great performance in both of Speed speed and Compression compression rate, Hadoop and HBase will support ZStandard in a close future.
I propose for Kafka to add support of Zstandard compression, along with new configuration options and binary log format update.
Before we go further, it would be better to see the benchmark result of Zstandard. I compared the compressed size and compression time of 3 1kb-sized messages (3102 bytes in total), with the Draft-implementation of ZStandard Compression Codec and all currently available CompressionCodecs. You can see the benchmark code from this commit. All elapsed times are the average of 1000 trials.
Codec | Level | Size | Time | Description |
---|---|---|---|---|
Gzip | - | 396 | 153 | |
Snappy | - | 1,063 | 37 | |
LZ4 | - | 387 | 57 | |
Zstandard | 1 | 374 | 56 | Speed-first setting. |
2 | 374 | 58 | ||
3 | 379 | 83 | Facebook's recommended default setting. | |
4 | 379 | 226 | ||
5 | 373 | 102 | ||
6 | 373 | 252 | ||
7 | 373 | 667 | ||
8 | 373 | 707 | ||
9 | 373 | 830 | ||
10 | 373 | 1,029 | ||
11 | 373 | 1,973 | ||
12 | 373 | 1,985 | ||
13 | 373 | 2,352 | ||
14 | 373 | 2,324 | ||
15 | 374 | 1,668 | ||
16 | 374 | 4,996 | ||
17 | 371 | 2,418 | ||
18 | 371 | 7,434 | ||
19 | 368 | 9,997 | ||
20 | 368 | 24,701 | ||
21 | 368 | 90,044 | ||
22 | 368 | 282,768 | Size-first setting. |
As you can see above, ZStandard outplays all existing algorithms in both of compression rate and speed, especially working with the speed-first setting (level 1).
Public Interfaces
This feature requires modification on both of Configuration Options and Binary Log format.
Configuration
lots of popular big data processing frameworks are supporting ZStandard.
- Hadoop (3.0.0) -
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key HADOOP-13578 - HBase (2.0.0) -
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key HBASE-16710 - Spark (2.3.0) -
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key SPARK-19112
ZStandard also works well with Apache Kafka. Benchmarks with the draft version (with ZStandard 1.3.3, Java Binding 1.3.3-4) showed significant performance improvement. The following benchmark is based on Shopify's production environment (Thanks to @bobrik)
(Above: Drop around 22:00 is zstd level 1, then at 23:30 zstd level 6.)
As You can see, ZStandard outperforms with a compression ratio of 4.28x; Snappy is just 2.5x and Gzip is not even close in terms of both of ratio and speed.
It is worth noting that this outcome is based on ZStandard 1.3. According to Facebook, ZStandard 1.3.4 improves throughput by 20-30%, depending on compression level and underlying I/O performance.
(Above: Comparison between ZStandard 1.3.3. vs. ZStandard 1.3.4.)
(Above: Comparison between other compression codecs, supported by Kafka.)
As of September 2018, the draft implementation uses Java binding for ZStandard 1.3.5.
Public Interfaces
This feature introduces a new available option 'zstd' A new available option 'zstd' will be added to the compression.type property, which is used in configuring Producer, Topic and Broker.
Binary Log Format
It's id will be 4.
It also introduces a new error code, UNSUPPORTED_COMPRESSION_TYPE (74). For details on this error code, see 'Compatibility, Deprecation, and Migration Plan' section.The bit 2 of 1-byte "attributes" identifier in Message will be used to denote ZStandard compression; Currently, the first 3 bits (bit 0 ~ bit 2) of 1-byte attributes identifier is reserved for compression codec. Since only 4 compression codecs (NoCompression, GZipCompression, SnappyCompression and LZ4Compression) are currently supported, bit 2 has not been used until now. In other words, the adoption of ZStandard will introduce a new bit flag in the binary log format.
Proposed Changes
- Add a new dependency on the Java bindings of ZStandard compression.
- Add a new value on CompressionType enum type and define ZStdCompressionCodec on kafka.message package.
- Add a new error type, 'UNSUPPORTED_COMPRESSION_TYPE'.
- Implement the compression logic along with compatibility logic described below.
You can check the concept-proof implementation of this feature on this Pull Request.
Compatibility, Deprecation, and Migration Plan
None.
Rejected Alternatives
We need to establish some backward-compatibility strategy for the case an old client subscribes a topic using ZStandard, explicitly or implicitly (i.e., 'compression.type' configuration of given topic is 'producer' and the producer compressed the records with ZStandard). After discussion, we decided to support zstd to the new clients only (i.e., uses v2 format) and return UNSUPPORTED_COMPRESSION_TYPE error for the old clients.
Here is the detailed strategy:
- Zstd will only be allowed with magic = 2 format. That is,
- Instantiating MemoryRecords with magic < 2 is disallowed.
- Down-conversion of zstd-compressed records will not be supported. So if the requested partition uses 'producer' compression codec and the client requests magic < 2, the broker will down-convert the batch until before using zstd and return with a dummy oversized record in place of the zstd-compressed batch. When the client attempts to fetch from then on, it will receive a UNSUPPORTED_COMPRESSION_TYPE error.
- Bump produce and fetch request versions. It will give the old clients a message to update their version.
- Zstd will only be allowed for the bumped produce API. That is, for older version clients (=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE regardless of the message format.
Zstd will only be allowed for the bumped fetch API. That is, if the requested partition uses zstd and the client version is below KAFKA_2_1_IV0, we return UNSUPPORTED_COMPRESSION_TYPE regardless of the message format.
The following section explains why we chose this strategy.
Rejected Alternatives
A. Support ZStandard to the old clients which can understand v0, v1 messages only.
This strategy necessarily requires the down-conversion of v2 message compressed with Zstandard into v0 or v1 messages, which means a considerable performance degradation. So we rejected this strategy.
B. Bump the API version and support only v2-available clients
With this approach, we can message the old clients that they are old and should be upgraded. However, there are still several options for the Error code.
B.1. INVALID_REQUEST (42)
This option gives the client so little information; the user can be confused about why the client worked correctly in the past suddenly encounters a problem. So we rejected this strategy.
B.2. CORRUPT_MESSAGE (2)
This option gives inaccurate information; the user can be surprised and misunderstand that the log files are broken in some way. So we rejected this strategy.
B.3 UNSUPPORTED_FOR_MESSAGE_FORMAT (43)
The advantage of this approach is that we don't need to define a new error code; we can reuse it and that's all. The disadvantage of this approach is that it is also a little bit vague; This error code is defined as a work for KIP-98 and now returned in the transaction error.
Since adding a new error type is not a big problem and a clear error message always helps, we decided to reject this strategyNone yet.
Related issues
This update introduces some related issues on Kafkalike following.
Whether to use existing library or not
...
The draft implementation adopted the first approach, following its Snappy support. (In contrast, Hadoop follows the latter approach.) You can see the used JNI binding library at here. However, I thought it would be much better to discuss the alternatives, for I am a newbie to Kafka.
Whether to support dictionary feature or not
...
License
We can use zstd and its Java binding, zstd-jni without any problem, but we need to include their license to the project - BSD and BSD 2 Clause license, respectively. They are not listed in the list of prohibited licenses also.
What we need is attaching the licenses for the dependencies only. A recent update on Apache Spark shows how to approach this problem. They did:
- 'LICENSE' file: License of the project itself (i.e., Apache License) and the list of source dependencies and their licenses.
- 'LICENSE-binary' file: The list of binary dependencies and their licenses.
- 'license' directory: Contains the license files of source dependencies.
- 'license-binary' directory: Contains the license files of binary dependencies.