Current state: Accepted
Discussion thread: https://lists.apache.org/thread/tbhmkf44jhjf8lqmo7w2whynbgttf1o6
Voting thread: https://lists.apache.org/thread/z6v4qyhgydl1tj0s3ycn6v4hv408gx2t
PR: https://github.com/apache/kafka/pull/12545
JIRA:
Currently we use Deserializer#deserialize(String topic, Headers headers, byte[] data) in CompletedFetch#parseRecord(TopicPartition, RecordBatch, Record) to deserialize key&value, we first call Utils.toArray(ByteBuffer) to convert ByteBuffer into byte[] and then call Deserializer#deserialize(String topic, Headers headers, byte[] data) which will cause memory allocation and memory copying.
The default implementation of this method would still call Utils.toNullableArray(ByteBuffer) and then leverage on the existing method. But for the following cases we can use ByteBuffer instead of byte[] for deserialization, which will reduce memory allocation and memory copying:
We propose adding default method Deserializer#deserialize(String, Headers, ByteBuffer).
Class | Method |
---|---|
Deserializer | default T deserialize(String topic, Headers headers, ByteBuffer data) { |
ByteBufferDeserializer | @Override |
StringDeserializer | @Override |
Deserializer
add default method deserialize(String, Headers, ByteBuffer)
;Deserializer#deserialize(String, Headers, ByteBuffer)
instead of Deserializer#deserialize(String, Headers, byte[])
in CompletedFetch#parseRecord(TopicPartition, RecordBatch, Record)
.This proposal has no compatibility issues, we just add default method deserialize(String, Headers, ByteBuffer) which is compatible with the existing Deserializers.
Another solution I thought of is PoolArea, just like Netty, but this solution is more complicated.