Current state: "Under Discussion"
Discussion thread: https://lists.apache.org/thread/5fmgl51x7h3gsx9mn7xq0n28j700rys0
PR: https://github.com/apache/kafka/pull/12685
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Currently we use Serializer#serialize(String, Headers, T) in KafkaProducer#doSend(ProducerRecord, Callback) to serialize key&value, we first call Serializer#serialize(String, Headers, T) to convert T into byte[] and then call Utils#wrapNullable(byte[]) to convert byte[] into ByteBuffer and finally append ByteBuffer to DefaultRecord. Why don't we add the default method Serializer#serializeToByteBuffer(String, Headers, T) and use it in KafkaProducer#doSend(ProducerRecord, Callback) ? If T is an instance of ByteBuffer or T is based on ByteBuffer, it will reduce a lot of memory allocation and memory copying.
We propose adding default method Serializer#serializeToByteBuffer(String, T), Serializer#serializeToByteBuffer(String, Headers, T) and Partitioner#partition(String, Object, ByteBuffer, Object, ByteBuffer, Cluster):
Class/Interface | Method |
---|---|
Serializer | default ByteBuffer serializeToByteBuffer(String topic, T data) { default ByteBuffer serializeToByteBuffer(String topic, Headers headers, T data) { |
ByteBufferSerializer | @Override @Override |
Partitioner | default int partition(String topic, Object key, ByteBuffer keyBytes, Object value, ByteBuffer valueBytes, Cluster cluster) { |
There are the following changes:
This proposal has no compatibility issues, we just add default methods Serializer#serializeToByteBuffer(String, T), Serializer#serializeToByteBuffer(String, Headers, T) and Partitioner#partition(String, Object, ByteBuffer, Object, ByteBuffer, Cluster) which is compatible with the existing Serializers.
There is no alternative.