Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: thread1 thread2here

JIRA: KAFKA-4514

Released: 2.1.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

(Above: Comparison between other compression codecs, supported by Kafka.)

As of August September 2018, the draft implementation uses Java binding for ZStandard 1.3.5.

Accompanying Issues

However, supporting ZStandard is not just adding new compression codec; It introduces several issues related to it. We need to address those issues first:

Backward Compatibility

Since the producer chooses the compression codec by default, there are potential problems:

A. An old consumer that does not support ZStandard receives ZStandard compressed data from the brokers.
B. Old brokers that don't support ZStandard receives ZStandard compressed data from the producer.

To address the problems above, we have following options:

a. Bump the produce and fetch protocol versions in ApiKeys.

Advantages:

  • Can guide the users to upgrade their client.
  • Can support advanced features.
  • Broker Transcoding: Currently, the broker throws UNKNOWN_SERVER_ERROR for unknown compression codec with this feature.
  • Per Topic Configuration - we can force the clients to use predefined compression codecs only by configuring available codecs for each topic. This feature should be handled in separate KIP, but this approach can be a preparation.

Disadvantages:

  • The older brokers can't make use of ZStandard.
  • Short of a bump to the message format version.

b. Leave unchanged - let the old clients fail.

Previously added codecs, Snappy (commit c51b940) and LZ4 (commit 547cced), follow this approach. With this approach, the problems listed above ends with following error message:

Code Block
java.lang.IllegalArgumentException: Unknown compression type id: 4

Advantages:

  • Easy to work: we need nothing special.

Disadvantages:

  • The error message is a little bit short. Some users with old clients may be confused how to cope with this error.

c. Improve the error messages

This approach is a compromise of a and b. We can provide supported api version for each compression codec within the error message by defining a mapping between CompressionCodec and ApiKeys:

Code Block
NoCompressionCodec => ApiKeys.OFFSET_FETCH        // 0.7.0
GZIPCompressionCodec => ApiKeys.OFFSET_FETCH      // 0.7.0
SnappyCompressionCodec => ApiKeys.OFFSET_FETCH    // 0.7.0
LZ4CompressionCodec => ApiKeys.OFFSET_FETCH       // 0.7.0
ZStdCompressionCodec => ApiKeys.DELETE_GROUPS     // 2.0.0

Advantages:

  • Not so much work to do.
  • Can guide the users to upgrade their client.
  • Spare some room for advances features in the future, like Per Topic Configuration.

Disadvantages:

  • The error message may still short.

Support Dictionary

Another issue worth bringing into play is the dictionary feature. ZStandard offers a training mode, which yields dictionary for compressing and decompression. It dramatically improves the compression ratio of small and repetitive input (e.g., semi-structured json), which perfectly fits into Kafka's case. (For real-world benchmark, see here) Although the details of how to adapt this feature into Kafka (example) should be discussed in the separate KIP, We need to leave room behind.

License

We can use zstd and its Java binding, zstd-jni without any problem, but we need to include their license - BSD and BSD 2 Clause license, respectively. They are not listed in the list of prohibited licenses also.

What we need is attaching the licenses for the dependencies. A recent update on Apache Spark shows how to approach this problem. They did:

  • 'LICENSE' file: License of the project itself (i.e., Apache License) and the list of source dependencies and their licenses.
  • 'LICENSE-binary' file: The list of binary dependencies and their licenses.
  • 'license' directory: Contains the license files of source dependencies.
  • 'license-binary' file: Contains the license files of binary dependencies.

Following this approach would be good, but we can take a little different approach since Kafka doesn't have lots of dependencies as many as Spark. (As of August 2018, the dependency we need to include license is jersey only.)

Public Interfaces

This feature requires modification on both of Configuration Options and Binary Log format.

Configuration

A new available option 'zstd' will be added to the compression.type property, which is used in configuring Producer, Topic and Broker.

Binary Log Format

The bit 2 of 1-byte "attributes" identifier in Message will be used to denote ZStandard compression; Currently, the first 3 bits (bit 0 ~ bit 2) of 1-byte attributes identifier is reserved for compression codec. Since only 4 compression codecs (NoCompression, GZipCompression, SnappyCompression and LZ4Compression) are currently supported, bit 2 has not been used until now. In other words, the adoption of ZStandard will introduce a new bit flag in the binary log format.

Proposed Changes

  1. Add a new dependency on the Java bindings of ZStandard compression.
  2. Add a new value on CompressionType enum type and define ZStdCompressionCodec on kafka.message package.
  3. Add appropriate routine for the backward compatibility problem discussed above.

You can check the concept-proof implementation of this feature on this Pull Request.

Compatibility, Deprecation, and Migration Plan

It is entirely up to the community's decision for the backward compatibility problem.

Rejected Alternatives


Public Interfaces

This feature introduces a new available option 'zstd' to the compression.type property, which is used in configuring Producer, Topic and Broker. It's id will be 4.

It also introduces a new error code, UNSUPPORTED_COMPRESSION_TYPE (74). For details on this error code, see 'Compatibility, Deprecation, and Migration Plan' section.

Proposed Changes

  1. Add a new dependency on the Java bindings of ZStandard compression.
  2. Add a new value on CompressionType enum type and define ZStdCompressionCodec on kafka.message package.
  3. Add a new error type, 'UNSUPPORTED_COMPRESSION_TYPE'.
  4. Implement the compression logic along with compatibility logic described below.

You can check the concept-proof implementation of this feature on this Pull Request.

Compatibility, Deprecation, and Migration Plan

We need to establish some backward-compatibility strategy for the case an old client subscribes a topic using ZStandard, explicitly or implicitly (i.e., 'compression.type' configuration of given topic is 'producer' and the producer compressed the records with ZStandard). After discussion, we decided to support zstd to the new clients only (i.e., uses v2 format) and return UNSUPPORTED_COMPRESSION_TYPE error for the old clients.

Here is the detailed strategy:

  1. Zstd will only be allowed with magic = 2 format. That is,
    • Instantiating MemoryRecords with magic < 2 is disallowed.
    • Down-conversion of zstd-compressed records will not be supported. So if the requested partition uses 'producer' compression codec and the client requests magic < 2, the broker will down-convert the batch until before using zstd and return with a dummy oversized record in place of the zstd-compressed batch. When the client attempts to fetch from then on, it will receive a UNSUPPORTED_COMPRESSION_TYPE error.
  2. Bump produce and fetch request versions. It will give the old clients a message to update their version.
  3. Zstd will only be allowed for the bumped produce API. That is, for older version clients (=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE regardless of the message format.
  4. Zstd will only be allowed for the bumped fetch API. That is, if the requested partition uses zstd and the client version is below KAFKA_2_1_IV0,  we return UNSUPPORTED_COMPRESSION_TYPE regardless of the message format.

The following section explains why we chose this strategy.

Rejected Alternatives

A. Support ZStandard to the old clients which can understand v0, v1 messages only.

This strategy necessarily requires the down-conversion of v2 message compressed with Zstandard into v0 or v1 messages, which means a considerable performance degradation. So we rejected this strategy.

B. Bump the API version and support only v2-available clients

With this approach, we can message the old clients that they are old and should be upgraded. However, there are still several options for the Error code.

B.1. INVALID_REQUEST (42)

This option gives the client so little information; the user can be confused about why the client worked correctly in the past suddenly encounters a problem. So we rejected this strategy.

B.2. CORRUPT_MESSAGE (2)

This option gives inaccurate information; the user can be surprised and misunderstand that the log files are broken in some way. So we rejected this strategy.

B.3 UNSUPPORTED_FOR_MESSAGE_FORMAT (43)

The advantage of this approach is that we don't need to define a new error code; we can reuse it and that's all. The disadvantage of this approach is that it is also a little bit vague; This error code is defined as a work for KIP-98 and now returned in the transaction error.

Since adding a new error type is not a big problem and a clear error message always helps, we decided to reject this strategyNone yet.

Related issues

This update introduces some related issues on Kafkalike following.

Whether to use existing library or not

...

The draft implementation adopted the first approach, following its Snappy support. (In contrast, Hadoop follows the latter approach.) You can see the used JNI binding library at here. However, I thought it would be much better to discuss the alternatives, for I am a newbie to Kafka.

Whether to support dictionary feature or not

...

License

We can use zstd and its Java binding, zstd-jni without any problem, but we need to include their license to the project - BSD and BSD 2 Clause license, respectively. They are not listed in the list of prohibited licenses also.

What we need is attaching the licenses for the dependencies only. A recent update on Apache Spark shows how to approach this problem. They did: