You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion

Discussion thread: Pending...

JIRA: Pending...

Motivation

I believe there are many use cases where List Serde could be used. Ex. https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windowshttps://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api

For instance, aggregate grouped (by key) values together in a list to do other subsequent operations on the collection.

Public Interfaces


  • New class org.apache.kafka.common.serialization.ListSerializer which implements the Serializer<List<T>> interface
  • New class org.apache.kafka.common.serialization.ListDeserializer which implements Deserializer<List<T>> interface
  • New subclass ListSerde in org.apache.kafka.common.serialization.Serdes which creates new serde based on ListSerializer and ListDeserializer classes

P.S. Static method corresponding to ListSerde under org.apache.kafka.common.serialization.Serdes (something like static public Serde<List<T>> List() {...} in org.apache.kafka.common.serialization.Serdes) class cannot be added because type needs to be defined beforehand. That's why one needs to create List Serde in the following fashion:

new Serdes.ListSerde<String>(Serdes.String(), Comparator.comparing(String::length));

(can possibly be simplified by declaring import static org.apache.kafka.common.serialization.Serdes.ListSerde)

Proposed Changes

This KIP proposes adding new ListSerializer and ListDeserializer classes as well as support for the new classes into the Serdes class. This will allow using List<T> Serde directly from Consumers, Producers and Streams.

List<T> serialization and deserialization will be done through repeatedly calling a serializer/deserializer for each entry provided by passed generic T's Serde. For example, if you want to create List of Bytes serde, you will have to declare `new Serdes.ListSerde<>(Serdes.String(), Comparator.comparing(String::length))`, in this case serializer/deserializer of String Serde will be used to serialize/deserialize each entry in `List<String>`.

Compatibility, Deprecation, and Migration Plan

Does not apply

Rejected Alternatives

Not known

  • No labels