Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: "Under Discussion"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread] 

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Note this is a joint work proposed by Xinli Shang Qichao Chu Zhifeng Chen @Yang Yang 

Motivation

Kafka is typically used in conjunction with Avro, JSON, or Protobuf etc to serialize/deserialize data record by record. In the producer client, records are buffered as a segment (record batch), and compression is optionally applied to the segment. When the number of records in each segment is larger, columnar storage like Apache Parquet becomes more efficient in terms of compression ratio, as compression typically performs better on columnar storage. This results in benefits for reducing traffic throughput to brokers/consumers and saving disk space on brokers.

In the use case of using Kafka for data lake ingestion, if we can produce the segment with Parquet, which is the native format in a data lake, the consumer application (e.g., Spark jobs for ingestion) can directly dump the segments as raw byte buffer into the data lake without unwrapping each record individually and then writing to the Parquet file one by one with expensive steps of encoding and compression again. This would provide additional savings for both data freshness of ingestion and resource consumption, as the ingestion job would be lighter weight and finishes significantly faster.

Parquet has a built-in column encryption feature that would enable Kafka to have field encryption for free,  offering several advantages: 1) It applies AES to the block of fields (i.e., Parquet page), minimizing overhead (5.7% for write and 3.7% for read). 2) It supports all data types of fields, including Boolean. 3) It is almost becoming industry standard now for big data column encryption mature and deployed at scale e.g. Uber, Apple, and AWS. 4) It operates at the data format layer, requiring Kafka only to enable it.

...

This KIP introduces the following additions to the public interfaces. No broker changes are needed. 

Client API changes

In the interface Producer<K,V>, we are going to add a new method as below

                  void setSchema(String topic, String schema);

In the interface Consumer<K, V>, we are going to add a new method as below 

No change is needed for Consumer<K, V> because KIP-712 already introduced 'fetch.raw.bytes' so that the ingestion consumer can fetch the byte buffer directly.

No broker changes are needed.                 byte[] pollBuffer(Duration timeout)

Proposed Changes

We propose adding Parquet as the encoder and optionally compressor in Kafka producer client. When this feature is enabled, Parquet is used to encode the batch records segment and optionally compress. Parquet has the encoding and compression in a columnar-oriented way. 

We divide consumers into two categories. Minimum change is needed for messaging consumer consumers for reading the Parquet segment, and a new consuming method is proposed to add for ingestion consumer. . The way to divide into the two categories and these terms are used solely for the purpose of this proposal.

Messaging Consumer

In this scenario, the application expects one or more records with each poll. When the Kafka consumer client encounters the Parquet format, it invokes the Parquet reader library to unwrap the segment into records. The required change is similar to the producer side, as discussed above.

...

For applications expecting a batch of records or even the entire segment to write to the sink (data lake), the Kafka consumer client can simply return byte buffer of the entire segment to the application, allowing it to directly dump it into the sink. The necessary change involves adding a new consumer API to return the segment byte buffer and bypass decompression. 


We refer to the first type of use case as a messaging consumer and the second one as an ingestion consumer. These terms are used solely for the purpose of this proposal. 

Current Data Format Transformation 

To set up the context for discussing the changes in the next section, let’s examine the current data formats in the producer, broker, and consumer, as well as the process of transformation outlined in the following diagram. We don’t anticipate changes to the broker, so we will skip discussing its format.

Image Modified

Producer

The producer writes the in-memory data structures to an encoder to serialize them to binary and then sends them to the Kafka client. 

...

In the following diagram, we describe the proposed data format changes in each state and the process of the transformation. In short, we propose replacing compression with Parquet. Parquet combines encoding and compression at the segment level. The ingestion consumer is simplified by solely dumping the Parquet segment into the data lake. 

Image Modified


Producer

The producer writes the in-memory data structures directly to the Kafka client and encodes and compresses all together. 

...

  1. The consumer gets the segment from the broker. 
  2. If the segment is in Parquet format, it uses the Parquet library to decode and optionally decompress.
  3. The record in the in-memory format is then sent to the application.

Ingestion Consumer

  1. The consumer gets the segment with Parquet format from the broker and sends it directly to the data lake.

...

Type:

string

Default:

none

Valid Values:

[none, gzip, snappy, lz4, zstd]

Importance:

highlow

Compatibility, Deprecation, and Migration Plan

...

  1. Regression test - The configuration ‘columnar.encoding’ is set to 'none’, run all the tests in Uber staging env which includes but is not limited to read/write with different scales. 
  2. Added feature - The configuration ‘columnar.encoding’ is set to 'parquet’.
    1. Verify the data is encoded as Parquet format 
    2. The producer, broker, and consumer all work as before functionality-wide. No exceptions are expected. 
    3. The newly added consumer API should be able to return the whole segment as Parquet format directlyconsumer fetches the data as byte buffer with 'fetch.raw.bytes' set and the data is with Parquet format

Performance tests 

Run tests for different topics that should have different data types and scale

...

  1. Both producer and consumer have the proposed changes 
    1. The feature is turned off in configuration, all all regression tests should work as before.
    2. When the producer turns on this feature, the consumer and replicator can consume as before. 
  2. Producer has the proposed changes, but the consumer doesn’t 
    1. The feature is turned off in configuration, all all regression tests should work as before.
    2. When the producer turns on this feature, the consumer and replicator throw an exception 
  3. Producer doesn’t have the proposed changes, but the consumer does 
    1. All the regression tests should pass 

...