Current state: Accepted
Discussion thread: Link
JIRA:
I believe there are many use cases where List Serde could be useful.
Ex. https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows, https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
For instance, aggregate grouped (by key) values together in a list to do other subsequent operations on the collection.
This KIP proposes adding new ListSerializer and ListDeserializer classes as well as support for the new ListSerde nested class inside the Serdes class. This will allow using List<L> Serde directly from Consumers, Producers and Streams where <L extends List<T>, T>.
List<L> serialization and deserialization will be done through repeatedly calling a serializer/deserializer for each entry provided by passed generic T's serde. For example, if you want to create List of Strings serde, then serializer/deserializer of Serdes.StringSerde will be used to serialize/deserialize each entry in List<String>.
List serde is an unusual type of serde because we need to consider two things here: the implementation of List interface(i.e. ArrayList, LinkedList, etc) as well as its enclosed elements' type.
First, we need to specify that we are going to use a list serde:
Then, we need to introduce two brand new configurations and here I'm proposing these four extra properties:
CommonClientConfigs.class (duplicated in StreamsConfig.class): DEFAULT_LIST_KEY/TYPE_SERDE_TYPE_CLASS = "default.list.key.serde.type"
Ex. default.list.key/value.serde.type = java.util.ArrayList
CommonClientConfigs.class (duplicated in StreamsConfig.class): DEFAULT_LIST_KEY/TYPE_SERDE_INNER_CLASS = "default.list.key.serde.inner"
Ex. default.list.key/value.serde.inner = org.apache.kafka.common.serialization.Serdes$IntegerSerde
ConsumerConfig.class: LIST_KEY_DESERIALIZER_TYPE_CLASS_CONFIG = "list.key.deserializer.type"
ConsumerConfig.class: LIST_KEY_DESERIALIZER_INNER_CLASS_CONFIG = "list.key.deserializer.inner"
ProducerConfig.class: LIST_KEY_SERIALIZER_INNER_CLASS_CONFIG = "list.key.serializer.inner"
P.S. We do not need a type class config for the serializer since we do not care about the type of the list class during serialization.
P.P.S. Properties default.list.key/value.* will be ignored as long as default.key/value.serde is not set to org.apache.kafka.common.serialization.Serdes$ListSerde
For the performance purposes the following serialization strategy was put in place. Depending on the type of an inner serde (a list's element type) the serialization will be performed in the following ways:
Case 1 Case 2 +------------------+ +------------------+ | | | | Int | Size of list | Int | Size of list | | | | | |------------------| |------------------| | | | | | Entry 1 | Int | Size of entry 1 | | | | | |------------------| |------------------| | | | | | Entry 2 | | Entry 1 | | | | | |------------------| |------------------| | | | | | | Int | Size of entry 2 | | | | | | | |------------------| | | | | | | | Entry 2 | | ... | | | | | |------------------| | | | | | | | | | | | ... | | | | | | | | | +------------------+ +------------------+
Does not apply
Not known