Status
Current state: Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here
JIRA: KAFKA-6923
Planned Release: 2.1.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
When headers were introduced by KIP-82 - Add Record Headers the change created the ExtendedSerializer
and ExtendedDeserializer
classes in order to keep interface compatibility but still add T deserialize(String topic, Headers headers, byte[] data);
and byte[] serialize(String topic, Headers headers, T data);
methods that consume the headers for serialization/deserialization. The reason for doing so was that Kafka at that time needed be compatbile with Java 7. Since we're not compiling on Java 7 anymore (KAFKA-4423) we should consolidate the way we're using these in a backward compatible fashion.
Public Interfaces
Two new interface methods will be created, which are the extra deserialize/serialize methods taken from the Extended classes:
public interface Serializer<T> extends Closeable { void configure(Map<String, ?> configs, boolean isKey); default byte[] serialize(String topic, T data) { return new byte[0]; } default byte[] serialize(String topic, Headers headers, T data) { // This is the new method return serialize(topic, data); } @Override void close(); }
public interface Deserializer<T> extends Closeable { void configure(Map<String, ?> configs, boolean isKey); default T deserialize(String topic, byte[] data) { return null; } default T deserialize(String topic, Headers headers, byte[] data) { // This is the new method return deserialize(topic, data); } @Override void close(); }
Proposed Changes
- Introduce the method above in both interfaces
- deprecate
ExtendedSerializer/
ExtendedDeserializer
andWrapper
saying that new and existing implementations should useSerializer/Deserializer
- leave
ExtendedSerializer/ExtendedDeserializer
as is, so existing behavior won't change. - get rid of the internal usages of
ExtendedSerializer/ExtendedDeserializer
&Wrapper.ensureExtended()
Compatibility, Deprecation, And Migration Plan
All changes should be backward compatible. The defined serialize/deserialize methods in ExtendedSerializer/ExtendedDeserializer
will act as abstract overrides for the default methods which is valid. From the perspective of the implementors it shouldn't be a breaking change as the generated code in ExtendedSerializer/ExtendedDeserializer
will remain the same.
Test Plan
Review existing unit tests and system tests.
Rejected Alternatives
The code examples here are made with the Deserializer
class but they are completely valid for the Serializer
as well.
Propagate to more complicated
We could propagate the method calls to the deserialize(String,Headers,byte[])
method. The drawback here is that it wouldn't be backward compatible as the ExtendedDeserializer just did the opposite thing: it propagated the deserialize(String,byte[])
method from the deserialize(String,Headers,byte[])
method.
public interface Deserializer<T> extends Closeable { default T deserialize(String topic, byte[] data) { return deserialize(topic, null, data); } default T deserialize(String topic, Headers headers, byte[] data) { return null; } }
Headers as parameter
In this version we'd make sure that setHeaders
is called right before deserialize
(or the same goes for serialize
). The drawback of this is that we'd force implementations to maintain state.
public interface Deserializer<T> extends Closeable { void setHeaders(Headers headers); default T deserialize(String topic, byte[] data) { return null; } }
Circular-referencing methods
This implementation is pretty good if the user overrides one (or both) the methods, however in the case where the default implementation is used for both of them, we'll get into a circular method reference and eventually a stack overflow.
public interface Deserializer<T> extends Closeable { default T deserialize(String topic, byte[] data) { return deserialize(topic, null, data); } default T deserialize(String topic, Headers headers, byte[] data) { return deserialize(topic, data); } }