You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: "Under Discussion"

Discussion thread: https://lists.apache.org/thread/5fmgl51x7h3gsx9mn7xq0n28j700rys0

PR: https://github.com/apache/kafka/pull/12685

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently we use Serializer#serialize(String, Headers, T) in KafkaProducer#doSend(ProducerRecord, Callback) to serialize key&value, we first call Serializer#serialize(String, Headers, T) to convert T into byte[] and then call Utils#wrapNullable(byte[])  to convert byte[] into ByteBuffer and finally append ByteBuffer to DefaultRecord. Why don't we add the default method Serializer#serializeToByteBuffer(String, Headers, T) and use it in KafkaProducer#doSend(ProducerRecord, Callback) ? If T is an instance of ByteBuffer or T is based on ByteBuffer, it will reduce a lot of memory allocation and memory copying.

Public Interfaces

We propose adding default method Serializer#serializeToByteBuffer(String, T), Serializer#serializeToByteBuffer(String, Headers, T) and Partitioner#partition(String, Object, ByteBuffer, Object, ByteBuffer, Cluster):

Class/InterfaceMethod
Serializer
default ByteBuffer serializeToByteBuffer(String topic, T data) {
return wrapNullable(serialize(topic, data));
}

default ByteBuffer serializeToByteBuffer(String topic, Headers headers, T data) {
return wrapNullable(serialize(topic, headers, data));
}
ByteBufferSerializer
@Override
public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {
if (data.position() > 0) {
data.flip();
}
return data;
}

@Override
public ByteBuffer serializeToByteBuffer(String topic, Headers headers, ByteBuffer data) {
return serializeToByteBuffer(topic, data);
}
Partitioner
default int partition(String topic, Object key, ByteBuffer keyBytes, Object value, ByteBuffer valueBytes, Cluster cluster) {
return partition(topic, key, toNullableArray(keyBytes), value, toNullableArray(valueBytes), cluster);
}

Proposed Changes

There are the following changes:

  • Serializer
    • Adding default method Serializer#serializeToByteBuffer(String, T) and Serializer#serializeToByteBuffer(String, Headers, T);
    • Implement serializeToByteBuffer(String, T) and serializeToByteBuffer(String, Headers, T) for ByteBufferSerializer;
    • Using Serializer#serializeToByteBuffer(String, Headers, T) to serialize key&value in KafkaProducer#doSend(ProducerRecord, Callback).
  • Partitioner
    • Adding default method Partitioner#partition(String, Object, ByteBuffer, Object, ByteBuffer, Cluster);
    • Implement partition(String, Object, ByteBuffer, Object, ByteBuffer, Cluster) for RoundRobinPartitioner;
    • Using Partitioner#partition(String, Object, ByteBuffer, Object, ByteBuffer, Cluster) in KafkaProducer#partition(ProducerRecord, ByteBuffer, ByteBuffer, Cluster).
  • Utils
    • Adding method murmur2(ByteBuffer data).

Compatibility, Deprecation, and Migration Plan

  • This proposal has no compatibility issues, we just add default methods Serializer#serializeToByteBuffer(String, T), Serializer#serializeToByteBuffer(String, Headers, T) and Partitioner#partition(String, Object, ByteBuffer, Object, ByteBuffer, Cluster) which is compatible with the existing Serializers.

Rejected Alternatives

There is no alternative.

  • No labels