Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Voting" Accepted

Discussion thread: https://lists.apache.org/thread/tbhmkf44jhjf8lqmo7w2whynbgttf1o6

...

PR: https://github.com/apache/kafka/pull/12545

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-14944

Motivation

Currently we use Deserializer#deserialize(String topic, Headers headers, byte[] data) in Fetcher#parseRecordCompletedFetch#parseRecord(TopicPartition, RecordBatch, Record) to deserialize key&value, we first call Utils.toArray(ByteBuffer) to convert ByteBuffer into byte[] and then call Deserializer#deserialize(String topic, Headers headers, byte[] data) which will cause memory allocation and memory copying.

...

  • For built-in StringDeserializer and ByteBufferDeserializer, we will do the deserialization directly on with the overloaded method with ByteBuffer.
  • If user-customized Deserializers override this overloaded method's default implementation, they also can reduce memory allocation and memory copying.

Public Interfaces

We propose adding default method Deserializer#deserialize(String, Headers, ByteBuffer).

ClassMethod

Deserializer

default T deserialize(String topic, Headers headers, ByteBuffer data) {
return deserialize(topic, headers, Utils.toNullableArray(data));
}

ByteBufferDeserializer

@Override
public ByteBuffer deserialize(String topic, Headers headers, ByteBuffer data) {
return data;
}

StringDeserializer

@Override
public String deserialize(String topic, Headers headers, ByteBuffer data) {
if (data == null) {
return null;
}

try {
if (data.hasArray()) {
return new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), encoding);
} else {
return new String(Utils.toArray(data), encoding);
}
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing ByteBuffer to string due to unsupported encoding " + encoding);
}
}

Proposed Changes

  • Deserializer add default method deserialize(String, Headers, ByteBuffer);
  • Invoke Deserializer#deserialize(String, Headers, ByteBuffer) instead of Deserializer#deserialize(String, Headers, byte[]) in Fetcher#parseRecordCompletedFetch#parseRecord(TopicPartition, RecordBatch, Record).

Compatibility, Deprecation, and Migration Plan

  • This proposal has no compatibility issues, we just add default method deserialize(String, Headers, ByteBuffer) which is compatible with the existing Deserializers.

  • If someone wants the deserializer to be compatible with older versions of the kafka-clients library they should always implement the byte array-based deserialize methods.

Rejected Alternatives

Another solution I thought of is PoolArea, just like Netty, but this solution is more complicated.