Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under Discussion"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This proposal suggests adding compression level and compression buffer size options to the producer, topic, and broker config.

Basically, CPU (running time) and I/O (compressed size) are trade-offs in compression. Since the best is use case dependent, lots of compression algorithms libraries provide a way to control the compression level with a reasonable default level, which results in a good performance in general. Add to this, the compression ratio is also affected by the buffer size - some users may want to trade compression for less/more memory.However However, Kafka does not provide a way to configure those options. Although it shows good performance with default settings, there are some cases which don't fit. For example:

...

the compression level - it uses the default level only.

This proposal suggests adding the compression level option to the producer, broker, and topic config. Running tests with a real-world dataset (see below), I found that this option improves the producer's message/second rate up to 156%

...

.

Public Interfaces

This feature introduces a new options for option, 'compression.level and buffer size ', to the producer, topic, and broker configuration. About public interfaces, there are two available alternatives:

Type A: Traditional Style

This approach introduces two config entities: compression.level and compression.buffer.sizeThe type of this option is an integer, with a default value of null.

Code Block
languagetext
compression.type=gzip
compression.level=4				# NEW: Compression level to be used.
compression.buffer.size=8192	# NEW: The size of compression buffer to be used.

For available values per compression type, please refer 'Compression level' and 'Compression buffer size' subsection under 'Proposed Changes'.

Pros:

  1. Easy to adapt.

Cons:

  1. It adds two configuration entities and even may require more entities following compression codecs' update.
  2. Every time the user tries to change the compression type, they must update `compression.level` and `compression.buffer.size` accordingly. In other words, it is error-prone.

Type B: Key-Value Style (proposed by Becket Qin)

This approach introduces only one config entity, compression.config. It contains a list of KEY:VALUEs concatenated with comma, like listener.security.protocol.map or max.connections.per.ip.overrides.

Code Block
languagetext
compression.type=gzip
compression.config=gzip.level:4,gzip.buffer.size:8192,snappy.buffer.size:32768,lz4.level:9,lz4.buffer.size:4,zstd.level:3	# NEW

The available KEYs are: gzip.level, gzip.buffer.size, snappy.buffer.size, lz4.level, lz4.buffer.size, zstd.level. For available values per each key, please refer 'Compression level' and 'Compression buffer size' subsection under 'Proposed Changes'.

Pros:

1. Adds only one configuration entity, and easy to following compression codecs' updates - If there is a new configuration option for compression codec,
2. Easy to switch compression type: Kafka broker picks up appropriate configuration following the compression type.

Cons:

1. A little bit complicated for initial set up.

Proposed Changes

The compression will be done with the specified level using the specified compression buffer size, like the following:

  • If the specified option is not supported for the codec ('compression.type'), it is ignored (e.g., compression level for snappy or buffer size for zstd.)
  • If the specified value is not available (or invalid), it raises an error.
  • If there is no specified value, it falls back to the default one.

The valid range and default value of compression level and buffer size are entirely up to the compression library, so they may be changed in the future. As of June 2019, their current values are like the following:

Compression level

...

Compression buffer size

...

The table below shows the valid range of compression.level per compression.type. (note: snappy is excluded since it does not support any compression level.) The valid range and default value of the compression level are entirely up to the compression library, so they may be changed in the future.

Compression CodecavailabilityValid RangeDefault
gzipYes1 (Deflater.BEST_SPEED) ~ 9 (Deflater.BEST_COMPRESSION)6
snappyNo--
lz4Yes1 ~ 179
zstdYes-131072 ~ 223

Proposed Changes

This option impacts the following processes:

  • Producer compresses the user-given messages.
  • Broker recompresses the user-given messages with specified compression.type per broker or topic.
  • Broker recompresses the messages in the log cleanup process.

Compressing the records with the given compression type and level works like the following:

  • If 'compression.type' is none of snappy, 'compression.level' is ignored.
  • If 'compression.level' is not in the valid range, it raises an error.
  • If 'compression.level' is in the valid range, the producer compresses the records with the given level.
  • If 'compression.level' is not set, it falls back to the default level.

Benchmark

Settings

To benchmark how compression level affects the producer performance, I ran a small benchmark with a real-world dataset like below:

Producer

With the feature implemented on top of the latest trunk (commit ccec9b0), I ran kafka-producer-perf-test.sh on GraalVM Java 8 v21.1.0 with the following parameters:

  • Number of records: 100,000
  • batch.size: 1048576 (1mb)
  • linger.ms: 100

Data

A random sample of 4096 real-world records from this dataset, which consists of 129218 json files with an average size of 55.25kb. 

Environment

MS Azure Kubernetes Cluster (Seoul Region), consists of 16 nodes of Standard_DS2_v2 (2vCPU, 7GB RAM, Expected network bandwidth of 1500 Mbps.)

Broker/Topic

Apache Kafka 2.7.0, GraalVM Java 8 (21.1.0), replicaton factor = 3.

Result

codeclevelproduced message / seclatency (ms)size (bytes)description
none
2,739.50205.345,659,454,754
gzip11,122.961,230.221,787,505,238min. level
gzip6717.712,041.241,644,280,629default level
gzip9608.542,413.661,643,517,758max. level
lz411,694.69603.462,211,346,795min. level
lz491,199.93878.852,184,022,257default level
lz417495.342,110.552,178,643,665max. level
zstd-57,653.45156.881,997,500,892experimental level
zstd16,317.5268.551,521,783,958
zstd34,760.54286.791,494,620,615default level
zstd12988.95863.891,458,150,768
zstd1885.202,017.921,492,015,424

It shows the following:

  • Codec is the main factor that differentiates the compressed size. However, The compression level makes little impact on it. The maximum improvement is is gzip/1 vs. gzip/9 (8%), and the minimum is lz4/1 vs. lz/17 (1.5%).
  • Excepting zstd/-5, when the compression level gets lower, messages/sec increase but latency decreases. Especially, compressing with zstd/1 produces 32.7% more messages per second than zstd/3 (current default), and gzip/1 produces 56.4% than gzip/6 (current default).
  • For every compression codec, compression with minimum level (i.e., speed first strategy) resulted in the best messages/second rate.

...

Compatibility, Deprecation, and Migration Plan

Since this update follows the default compression level and current buffer size if they are not set, there is no backward compatibility problem.

Rejected Alternatives

Can we support

...

the compression buffer size option?

At the initial stage, I considered the compression buffer size option. However, during the benchmark, I could not find any impacts on produce speed nor compressed size. So it was rejected. 

We can, but after some discussion, we decided that supporting both options at once is better - since both of them impacts the compression. So we decided to expand the initial proposal, which handles compression level only.

Can we support universal 'default compression level' value for producer config?

Impossible. Currently, most compression codecs allow to adjust the compression level with int type value, and it seems like it never changes. However, not all of these codecs support a value to denote 'default compression level' or the assigned value to default level differs; For example, gzip uses '-1' for default level but zstd used 0 for default level; Since the latest release of zstd allows negative compression level, the meaning of 0 level is also changing.

For these reasons, we can't provide a universal int value to denote default compression level.

Can we use external dictionary feature?

This feature requires an option to specify the dictionary for the supported codec, e.g., snappy, lz4, and zstd. It obviously over the scope of this KIP.