Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under Discussion"

Discussion thread: https://lists.apache.org/thread/l2zkd375818gkg5753xqhcqf4boqhbqm

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4852

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently `ByteBufferSerializer#serialize(String, ByteBuffer)` has compatible problem, If the ByteBuffer contains only part of the data `ByteBufferSerializer#serialize(String, ByteBuffer)` will return all bytes in the ByteBuffer instead of valid bytes:

...

Code Block
languagejava
array lengths differ, expected: <5> but was: <7>
Expected :5
Actual   :7
<Click to see difference>org.opentest4j.AssertionFailedError: array lengths differ, expected: <5> but was: <7>
    ...
    at org.apache.kafka.common.serialization.SerializationTest.testByteBufferSerializer(SerializationTest.java:397)
    ...
    at java.util.ArrayList.forEach(ArrayList.java:1259)
    ...
    at java.util.ArrayList.forEach(ArrayList.java:1259)
    ...
    at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
    at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)


Public Interfaces

There are no new interfaces and no existing interfaces that will be removed or changed.

Proposed Changes

Change the implement of `ByteBufferSerializer#serialize(String, ByteBuffer)`:

...

  • If the old behavior is correct, then the new behavior is also correct.

Test Plan

Code Block
languagejava
    @Test
    public void testByteBufferSerializer() {
        final byte[] bytes = "Hello".getBytes(UTF_8);
        final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes);
        final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes);
        final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
        final ByteBuffer directBuffer0 = ByteBuffer.allocateDirect(bytes.length + 1).put(bytes);
        final ByteBuffer directBuffer1 = ByteBuffer.allocateDirect(bytes.length).put(bytes);
        try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer0));
            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer1));
            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer2));
            assertArrayEquals(bytes, serializer.serialize(topic, directBuffer0));
            assertArrayEquals(bytes, serializer.serialize(topic, directBuffer1));
        }
    }

...