Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

...

Currently, we use

...

Serializer#serialize(String, Headers, T)

...

in

...

KafkaProducer#doSend(ProducerRecord, Callback)

...

to serialize key

...

and value. First, we

...

call

...

Serializer#serialize(String, Headers, T) to convert

...

T

...

into

...

byte[]

...

, then

...

use Utils#wrapNullable(byte[])

...

to convert byte[] into ByteBuffer, and finally

...

write ByteBuffer into MemoryRecordsBuilder through DefaultRecord#writeTo(DataOutputStream, int, long, ByteBuffer, ByteBuffer, Header[]).

Why don't we add

...

a serializeToByteBuffer(String, Headers, T) method to Serializer, and then use Serializer#serializeToByteBuffer(String, Headers, T)

...

in KafkaProducer#doSend(ProducerRecord, Callback)? If T is an instance of ByteBuffer or T is based on ByteBuffer,

...

we would reduce a lot of memory allocation and memory copying.

Additionally, I plan to ultimately replace byte[] with ByteBuffer in Serializer.


Public Interfaces

We propose adding default method Serializer#serializeToByteBuffer(String, T), Serializer#serializeToByteBuffer(String, Headers, T) and Partitioner#partition(String, Object, ByteBuffer, Object, ByteBuffer, Cluster):

Class/InterfaceMethod
Serializer
default ByteBuffer serializeToByteBuffer(String topic, T data) {
return wrapNullable(serialize(topic, data));
}

default ByteBuffer serializeToByteBuffer(String topic, Headers headers, T data) {
return wrapNullable(serialize(topic, headers, data));
}
ByteBufferSerializer
@Override
public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {
if (data == null) {
return null;
}

    // Consider that ByteBuffer#wrap(byte[]) return a ByteBuffer that does not need to call flip().
if (data.position() > 0) {
data.flip();
}
return data;
}

@Override
public ByteBuffer serializeToByteBuffer(String topic, Headers headers, ByteBuffer data) {
return serializeToByteBuffer(topic, data);
}
Partitioner
default int partition(String topic, Object key, ByteBuffer keyBytes, Object value, ByteBuffer valueBytes, Cluster cluster) {
return partition(topic, key, toNullableArray(keyBytes), value, toNullableArray(valueBytes), cluster);
}

...

Compatibility, Deprecation, and Migration Plan

  • Compatibility

    • We

    This proposal has no compatibility issues, we
    • just add default methods Serializer#serializeToByteBuffer(String, T), Serializer#serializeToByteBuffer(String, Headers, T) and Partitioner#partition(String, Object, ByteBuffer, Object, ByteBuffer, Cluster) which is compatible with the existing Serializers.

    • The impact on ByteBuffer offsets is consistent after calling ByteBufferSerializer#serialize(String, ByteBuffer) and ByteBufferSerializer#serializeToByteBuffer(String, Headers, T).

Rejected Alternatives

There is no alternative.