Current state: Accepted (vote thread)
Discussion thread: here
JIRA: KAFKA-6923
Planned Release: 2.1.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
When headers were introduced by KIP-82 - Add Record Headers the change created the ExtendedSerializer
and ExtendedDeserializer
classes in order to keep interface compatibility but still add T deserialize(String topic, Headers headers, byte[] data);
and byte[] serialize(String topic, Headers headers, T data);
methods that consume the headers for serialization/deserialization. The reason for doing so was that Kafka at that time needed be compatbile with Java 7. Since we're not compiling on Java 7 anymore (KAFKA-4423) we should consolidate the way we're using these in a backward compatible fashion.
Two new interface methods will be created, which are the extra deserialize/serialize methods taken from the Extended classes:
public interface Serializer<T> extends Closeable { void configure(Map<String, ?> configs, boolean isKey); byte[] serialize(String topic, T data); default byte[] serialize(String topic, Headers headers, T data) { // This is the new method return serialize(topic, data); } @Override void close(); } |
public interface Deserializer<T> extends Closeable { void configure(Map<String, ?> configs, boolean isKey); T deserialize(String topic, byte[] data); default T deserialize(String topic, Headers headers, byte[] data) { // This is the new method return deserialize(topic, data); } @Override void close(); } |
ExtendedSerializer/
ExtendedDeserializer
and Wrapper
saying that new and existing implementations should use Serializer/Deserializer
ExtendedSerializer/ExtendedDeserializer
as is, so existing behavior won't change.ExtendedSerializer/ExtendedDeserializer
& Wrapper.ensureExtended()
All changes should be backward compatible. The defined serialize/deserialize methods in ExtendedSerializer/ExtendedDeserializer
will act as abstract overrides for the default methods which is valid. From the perspective of the implementors it shouldn't be a breaking change as the generated code in ExtendedSerializer/ExtendedDeserializer
will remain the same.
Review existing unit tests and system tests.
Manual verification of binary compatibility: existing ExtendedSerializer implementations should work without recompiling the application.
The code examples here are made with the Deserializer
class but they are completely valid for the Serializer
as well.
There are options to provide a default implementation: return a dummy value or throw an exception as shown below. This means that we'll have an interface where there are no methods enforced. Furthermore we should somehow suggest users that which method should they implement. To achieve this we could deprecate the 2 parameter ("headerless") method but this would litter the code base with warnings as we're still using this method in a lot of places (like the serializers and deserializers).
public interface Serializer<T> extends Closeable { void configure(Map<String, ?> configs, boolean isKey); @Deprecated default byte[] serialize(String topic, T data) { return new byte[0]; // throw new UnsupportedOperationException("Method not implemented"); } default byte[] serialize(String topic, Headers headers, T data) { // This is the new method return serialize(topic, data); } @Override void close(); } |
public interface Deserializer<T> extends Closeable { void configure(Map<String, ?> configs, boolean isKey); @Deprecated default T deserialize(String topic, byte[] data) { return null; // throw new UnsupportedOperationException("Method not implemented"); } default T deserialize(String topic, Headers headers, byte[] data) { // This is the new method return deserialize(topic, data); } @Override void close(); } |
We could propagate the method calls to the deserialize(String,Headers,byte[])
method. The drawback here is that it wouldn't be backward compatible as the ExtendedDeserializer just did the opposite thing: it propagated the deserialize(String,byte[])
method from the deserialize(String,Headers,byte[])
method.
public interface Deserializer<T> extends Closeable { default T deserialize(String topic, byte[] data) { return deserialize(topic, null, data); } default T deserialize(String topic, Headers headers, byte[] data) { return null; } } |
In this version we'd make sure that setHeaders
is called right before deserialize
(or the same goes for serialize
). The drawback of this is that we'd force implementations to maintain state.
public interface Deserializer<T> extends Closeable { void setHeaders(Headers headers); default T deserialize(String topic, byte[] data) { return null; } } |
This implementation is pretty good if the user overrides one (or both) the methods, however in the case where the default implementation is used for both of them, we'll get into a circular method reference and eventually a stack overflow.
public interface Deserializer<T> extends Closeable { default T deserialize(String topic, byte[] data) { return deserialize(topic, null, data); } default T deserialize(String topic, Headers headers, byte[] data) { return deserialize(topic, data); } } |