Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder discussionAdopted

Discussion thread:  https://sematext.com/opensee/m/Kafka/uyzND1VU1Ou1y0Lbh?subj=+DISCUSS+KIP+466+Add+support+for+List+lt+T+gt+serialization+and+deserialization Link

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8326

...

I believe there are many use cases where List Serde could be used.useful:Ex.

...

For instance, aggregate grouped (by key) values together in a list to do other subsequent operations on the collection.

Public Interfaces

  • New class org.apache.kafka.common.serialization.ListSerializer which implements the Serializer<List<T>>ListSerializer<Inner> implements Serializer<List<Inner>> interface
  • New class org.apache.kafka.common.serialization.ListDeserializer which implements Deserializer<List<T>>ListDeserializer<Inner> implements Deserializer<List<Inner>> interface
  • New subclass ListSerde<T>static subclass class ListSerde<Inner> extends WrapperSerde<List<Inner>> in org.apache.kafka.common.serialization.Serdes which creates new serde based on ListSerializer and ListDeserializer classes
  • New method public static <T> Serde<List<T>> <L extends List, Inner> Serde<List<Inner>> ListSerde(in Class<L> listClass, Serde<Inner> innerSerde) in org.apache.kafka.common.serialization.Serdes class (infers list implementation and inner serde from config file)
  • New method public static <T> Serde<List<T>> ListSerde(Class listClass, Serde<T> innerSerde) in org.apache.kafka.common.serialization.Serdes class

Proposed Changes

Proposed Changes

This KIP This KIP proposes adding new ListSerializer and ListDeserializer classes as well as support for the new classes into ListSerde nested class inside the Serdes class. This will allow using List<T> Serde directly Serde<List<Inner>> directly from Consumers, Producers and Streams.

List<T> Serde<List<Inner>> serialization and deserialization will be done through repeatedly calling a serializer/deserializer for each entry provided by passed generic T Inner's Serdeserde. For example, if you want to create List of Strings serde, then serializer/deserializer of Serdes.StringSerde will be used to serialize/deserialize each entry in List<String>.

...

List serde is an unusual type of serde because we need to consider two things here: the implementation of List interface(i.e. ArrayList, LinkedList, etc) and as well as its enclosed elements' type.

First, as usually we need to specify our list serde usingthat we are going to use a list serde:

default.key/value.serde = org.apache.kafka.common.serialization.Serdes$ListSerde

Then, we need to cover those introduce two brand new configurations and here I'm proposing these two four extra properties:

CommonClientConfigs.class: DEFAULT_LIST_KEY/VALUE_SERDE_TYPE_CLASS = "default.list.key

...

.serde.type

...

"

Ex.

...

default.list.key/value.serde.

...

type =

...

java.

...

util.ArrayList

CommonClientConfigs.class: DEFAULT_LIST_KEY/VALUE_SERDE_INNER_CLASS = "default.list.key.serde.inner"

Ex. default.list.key/value.serde.inner = org.apache.kafka.common.serialization.Serdes$IntegerSerde

P.S. kafka.common.serialization.Serdes$IntegerSerdeProperties default.list.key/value.* will be ignored as long as default.key/value.serde is not set to org.apache.kafka.common.serialization.Serdes$ListSerde

Serialization Strategy

For the performance purposes the following serialization strategy was strategies were put in place:

enumSerializationStrategy {
    CONSTANT_SIZE,
    VARIABLE_SIZE;
}

. Depending on the type of an inner serde (a list's element type) the serialization will be performed in two different the following ways:

  1. If the For SerializationStrategy.CONSTANT_SIZE, if an inner serde has one of the following serializers (LongSerializerShortSerializer.class, IntegerSerializer.class, ShortSerializerFloatSerializer.class, FloatSerializer LongSerializer.classDoubleSerializer.class, UUIDSerializer.class), then the final payload will not contain each element's size encoded since the size sizes of listed presented types is are static (8 2 bytes, 4  4 bytes, 2 8 bytes, etc.)
  2. If For SerializationStrategy.VARIABLE_SIZE, if the inner serde doesn't have one of the serializers listed above, then a size of each element will be encoded in the final payload (see below)

Additionally, there are two different ways of serializing NULL values within the payload:

  1. For SerializationStrategy.CONSTANT_SIZE, the list serializer will generate a null index list that contains indexes of all null entries within the payload
  2. For SerializationStrategy.VARIABLE_SIZE, the list serializer instead will write Serdes.ListSerde.NULL_ENTRY_VALUE (-1 by default) for the size of a null entry
                                                                        
                Case 1                             Case 2            
       
 CONSTANT_SIZE                                    VARIABLE_SIZE                                   
   
      +------------------+               +------------------+        
         |                  |               |                  |        
     Int |   Size of list   |           
+----------------------------+                   +----------------------------+                             
|    SerializationStrategy   |                   |    SerializationStrategy   |                             
|            Flag            |                   |            Flag            |                             
|----------------------------|                   |----------------------------|                             
|    NullIndexList.size()    |                   |     PayloadList.size()     |                             
|----------------------------|                   |----------------------------|                             
|        Null index 1        |                   |      Size of entry 1       |                             
|----------------------------|                   |----------------------------|                             
|        Null index 2        |                   |                            |                             
|----------------------------|                   |          Entry 1           |                             
|            ...             |                   |                            |                             
|----------------------------|                   |----------------------------|                             
|     PayloadList.size()     |                   |      Size of entry 2       |                             
|----------------------------|                   |----------------------------|                             
|                            |                   |                            |                             
|          Entry 1           |                   |          Entry 1           |                             
|                            |                   |                            |                            -
|----------------------------|                   |----------------------------|                       Int |   Size of list
|   |        
         |        |          |         |      |                  |    |    
         |------------------|               |------------------|  
|         
 Entry 2       |    |              |     |          |                  |        
         |     Entry 1      
|           Int |  Size of entry 1 |        
  |       |            |      |                |      |            |        
         |--------
|----------|               |------------------|          
         |                            |                             
|                            |          
         |     Entry  2      |               |           Entry    1      |        
|          |                  |               |    |            ...  |        
   |      |------------------|               |------------------|        
|          |                  |               |    |              |        
         |                  |           Int 
|   Size of entry 2 |        
         |    |              |     |          |                  |         
         |           
|       |     ...          |------------------|   |     
         |     |             |               |                  |        
   
|      |                  |    |           |     Entry 2  |    |        
         |       ... |       |               |       
|           |        
         |                   |               |------------------|        
     |    |                  |       
|        |                  |  |      
         |     |             |               |                  |        
   
|      |                  |    |           |       ... |       |        
         |    |              |               
|                   |        
 |        |           |       |               |      |            |        
         
+----------------------------+                   +----------------------------+                             

Compatibility, Deprecation, and Migration Plan

...