You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 13 Next »

Status

Current stateUnder discussion

Discussion threadhttps://sematext.com/opensee/m/Kafka/uyzND1VU1Ou1y0Lbh?subj=+DISCUSS+KIP+466+Add+support+for+List+lt+T+gt+serialization+and+deserialization

JIRA Unable to render Jira issues macro, execution error.

Motivation

I believe there are many use cases where List Serde could be used.

Ex. https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windowshttps://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api 

For instance, aggregate grouped (by key) values together in a list to do other subsequent operations on the collection.

Public Interfaces

  • New class org.apache.kafka.common.serialization.ListSerializer which implements the Serializer<List<T>> interface
  • New class org.apache.kafka.common.serialization.ListDeserializer which implements Deserializer<List<T>> interface
  • New subclass ListSerde<T> in org.apache.kafka.common.serialization.Serdes which creates new serde based on ListSerializer and ListDeserializer classes
  • New method public static <T> Serde<List<T>> ListSerde() in org.apache.kafka.common.serialization.Serdes class (infers list implementation and inner serde from config file)
  • New method public static <T> Serde<List<T>> ListSerde(Class listClass, Serde<T> innerSerde) in org.apache.kafka.common.serialization.Serdes class

Proposed Changes

This KIP proposes adding new ListSerializer and ListDeserializer classes as well as support for the new classes into the Serdes class. This will allow using List<T> Serde directly from Consumers, Producers and Streams.

List<T> serialization and deserialization will be done through repeatedly calling a serializer/deserializer for each entry provided by passed generic T's Serde. For example, if you want to create List of Strings serde, then serializer/deserializer of StringSerde will be used to serialize/deserialize each entry in `List<String>`.

Proposed Configurations

List serde is an unusual type of serde because we need to consider two things here: the implementation of List (i.e. ArrayList, LinkedList, etc) and enclosed elements' type.

First, as usually we need to specify our list serde using:

default.key/value.serde = org.apache.kafka.common.serialization.Serdes$ListSerde

Then, we need to cover those two configurations and here I'm proposing these two extra properties:

default.key/value.list.serde.impl = java.util.ArrayList

default.key/value.list.serde.element = org.apache.kafka.common.serialization.Serdes$IntegerSerde

Properties default.key/value.list.* will be ignored as long as default.key/value.serde is not set to org.apache.kafka.common.serialization.Serdes$ListSerde

Serialization Strategy

For performance purposes the following serialization strategy was put in place. Depending on the type of an inner serde (a list's element type) the serialization will be performed in two different ways:

  1. If the inner serde has one of the following serializers (LongSerializer.classIntegerSerializer.classShortSerializer.classFloatSerializer.classDoubleSerializer.class), then the final payload will not contain each element's size since the size of listed types is static (8 bytes, 4 bytes, 2 bytes, etc)
  2. If the inner serde doesn't have one of the serializers listed above, then a size of each element will be encoded in the final payload (see below)
                                                                        
                Case 1                             Case 2               
                                                                        
         +------------------+               +------------------+        
         |                  |               |                  |        
     Int |   Size of list   |           Int |   Size of list   |        
         |                  |               |                  |        
         |------------------|               |------------------|        
         |                  |               |                  |        
         |     Entry 1      |           Int |  Size of entry 1 |        
         |                  |               |                  |        
         |------------------|               |------------------|        
         |                  |               |                  |        
         |     Entry 2      |               |     Entry 1      |        
         |                  |               |                  |        
         |------------------|               |------------------|        
         |                  |               |                  |        
         |                  |           Int |  Size of entry 2 |        
         |                  |               |                  |        
         |                  |               |------------------|        
         |                  |               |                  |        
         |                  |               |     Entry 2      |        
         |       ...        |               |                  |        
         |                  |               |------------------|        
         |                  |               |                  |        
         |                  |               |                  |        
         |                  |               |       ...        |        
         |                  |               |                  |        
         |                  |               |                  |        
         +------------------+               +------------------+        


Compatibility, Deprecation, and Migration Plan

Does not apply

Rejected Alternatives

Not known

  • No labels