Table of Contents |
---|
Status
Current state: Under DiscussionAdopted
Discussion thread: https://sematext.com/opensee/m/Kafka/uyzND1VU1Ou1y0Lbh?subj=+DISCUSS+KIP+466+Add+support+for+List+lt+T+gt+serialization+and+deserialization Link
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
...
I believe there are many use cases where List Serde could be used. Ex. useful:
...
For instance, aggregate grouped (by key) values together in a list to do other subsequent operations on the collection.
Public Interfaces
- New class class org.apache.kafka.common.serialization.ListSerializer which implements the
Serializer<List<T>>
ListSerializer<Inner> implements Serializer<List<Inner>> interface - New class class org.apache.kafka.common.serialization.ListDeserializer which ListDeserializer<Inner> implements
Deserializer<List<T>>
Deserializer<List<Inner>> interface - New subclass
ListSerde
in static subclass class ListSerde<Inner> extends WrapperSerde<List<Inner>> in org.apache.kafka.common.serialization.Serdes which creates new serde based onListSerializer
andListDeserializer
classes
...
- Serdes
- New method public static <L extends List, Inner> Serde<List<Inner>> ListSerde(Class<L> listClass, Serde<Inner> innerSerde) in org.apache.kafka.common.serialization.Serdes
...
- class
...
new Serdes.ListSerde<String>(Serdes.String(), Comparator.comparing(String::length));
(can possibly be simplified by declaring import static org.apache.kafka.common.serialization.Serde
s.ListSerde)
Proposed Changes
This KIP proposes adding new ListSerializer and ListDeserializer classes as well as support for the new classes into ListSerde nested class inside the Serdes class. This will allow using List<T> Serde directly Serde<List<Inner>> directly from Consumers, Producers and Streams.
List<T> Serde<List<Inner>> serialization and deserialization will be done through repeatedly calling a serializer/deserializer for each entry provided by passed generic T Inner's Serdeserde. For example, if you want to create List of Bytes Strings serde, you will have to declare `new Serdes.ListSerde<>(Serdes.String(), Comparator.comparing(String::length))`, in this case serializer/deserializer of String Serde will be used to serialize/deserialize each entry in `List<String>`. then serializer/deserializer of Serdes.StringSerde will be used to serialize/deserialize each entry in List<String>.
Proposed Configurations
List serde is an unusual type of serde because we need to consider two things here: the implementation of List interface(i.e. ArrayList, LinkedList, etc) as well as its enclosed elements' type.
First, we need to specify that we are going to use a list serde:
default.key/value.serde = org.apache.kafka.common.serialization.Serdes$ListSerde
Then, we need to introduce two brand new configurations and here I'm proposing these four extra properties:
CommonClientConfigs.class: DEFAULT_LIST_KEY/VALUE_SERDE_TYPE_CLASS = "default.list.key.serde.type"
Ex. default.list.key/value.serde.type = java.util.ArrayList
CommonClientConfigs.class: DEFAULT_LIST_KEY/VALUE_SERDE_INNER_CLASS = "default.list.key.serde.inner"
Ex. default.list.key/value.serde.inner = org.apache.kafka.common.serialization.Serdes$IntegerSerde
P.S. Properties default.list.key/value.* will be ignored as long as default.key/value.serde is not set to org.apache.kafka.common.serialization.Serdes$ListSerde
Serialization Strategy
For the performance purposes the following serialization strategies were put in place:
enumSerializationStrategy {
CONSTANT_SIZE,
VARIABLE_SIZE;
}
Depending on the type of an inner serde (a list's element type) the serialization will be performed in the following ways:
- For SerializationStrategy.CONSTANT_SIZE, if an inner serde has one of the following serializers (ShortSerializer.class, IntegerSerializer.class, FloatSerializer.class, LongSerializer.class, DoubleSerializer.class, UUIDSerializer.class), then the final payload will not contain each element's size encoded since sizes of presented types are static (2 bytes, 4 bytes, 8 bytes, etc.)
- For SerializationStrategy.VARIABLE_SIZE, if the inner serde doesn't have one of the serializers listed above, then a size of each element will be encoded in the final payload (see below)
Additionally, there are two different ways of serializing NULL values within the payload:
- For SerializationStrategy.CONSTANT_SIZE, the list serializer will generate a null index list that contains indexes of all null entries within the payload
- For SerializationStrategy.VARIABLE_SIZE, the list serializer instead will write Serdes.ListSerde.NULL_ENTRY_VALUE (-1 by default) for the size of a null entry
CONSTANT_SIZE VARIABLE_SIZE
+----------------------------+ +----------------------------+
| SerializationStrategy | | SerializationStrategy |
| Flag | | Flag |
|----------------------------| |----------------------------|
| NullIndexList.size() | | PayloadList.size() |
|----------------------------| |----------------------------|
| Null index 1 | | Size of entry 1 |
|----------------------------| |----------------------------|
| Null index 2 | | |
|----------------------------| | Entry 1 |
| ... | | |
|----------------------------| |----------------------------|
| PayloadList.size() | | Size of entry 2 |
|----------------------------| |----------------------------|
| | | |
| Entry 1 | | Entry 1 |
| | | | -
|----------------------------| |----------------------------|
| | | |
| Entry 2 | | |
| | | |
|----------------------------| | |
| | | |
| | | ... |
| | | |
| | | |
| ... | | |
| | | |
| | | |
| | | |
| | | |
| | | |
+----------------------------+ +----------------------------+
Compatibility, Deprecation, and Migration Plan
...