Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state"Under Discussion"

...

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Motivation

In Scala it is a common practice to define/derive all typeclass instances in one place and after that use them in all other places.

...

Such solution will do the job, but it requires some boilerplate and makes the whole logic of a program a bit unclear. 

Public Interfaces

I want to use different Serdes for keys and for values. This means that I will have to change more or less all the public function signatures of the scala wrapper library, where we take Serde as a parameter.

Proposed Changes

I think it will be enough to do the same trick, that I did for 'labeling' models, but for Serdes.  

Code Block
languagescala
themeEmacs
titleSolution
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream.{Consumed, KStream}
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde

import scala.collection.JavaConverters._

object Solution {

  trait IsKeySerde
  type KeySerde[T] = Serde[T] with IsKeySerde

  trait IsValueSerde
  type ValueSerde[T] = Serde[T] with IsValueSerde

  //example of the changed implicit conversion from the org.apache.kafka.streams.scala.ImplicitConversions
  implicit def consumedFromSerdeNew[K, V](implicit keySerde: KeySerde[K], valueSerde: ValueSerde[V]): Consumed[K, V] =
    Consumed.`with`[K, V]
  
}

object OldBehavior {
  import Solution._
  implicit def asKeySerde[T](serde: Serde[T]): KeySerde[T] = serde.asInstanceOf[KeySerde[T]]
  implicit def asValueSerde[T](serde: Serde[T]): ValueSerde[T] = serde.asInstanceOf[ValueSerde[T]]
}

object SerdesSupport {
  import Solution._
  import OldBehavior._

  // I don't want to go too deep into implementation details,
  // so lets define some dummy type class called AlmostSerde and assume that,
  // if I have an instance of that type class for type T and GenericAvroSerde, I can create instance of Serde for T
  class AlmostSerde[T]()
  implicit def deriveAlmostSerde[T]: AlmostSerde[T] = ??? // assume that we can derive AlmostSerde instances for all T
  private def createSerde[T](as: AlmostSerde[T], gas: GenericAvroSerde): Serde[T] = ???

  implicit def specificSerdeForKey[T >: Null](implicit as: AlmostSerde[T]): KeySerde[T] = {
    val genericSerde = new GenericAvroSerde()
    val configuration = Map[String, Any]().asJava
    val isSerdeForKey = true
    genericSerde.configure(configuration, isSerdeForKey)
    createSerde(as, genericSerde)
  }

  implicit def specificSerdeForValue[T >: Null](implicit as: AlmostSerde[T]): ValueSerde[T] = {
    val genericSerde = new GenericAvroSerde()
    val configuration = Map[String, Any]().asJava
    val isSerdeForKey = false
    genericSerde.configure(configuration, isSerdeForKey)
    createSerde(as, genericSerde)
  }

}

object TestApp extends App {
  case class A(x: Int)
  case class B(y: String)

  import Solution._
  import SerdesSupport._

  val builder = new StreamsBuilder()

  val streamAtoB: KStream[A, B] = builder.stream("topic")
  val streamBtoA: KStream[B, A] = streamAtoB.map((a, b) => b -> a)
}


Compatibility, Deprecation, and Migration Plan

In theory we do not need any changes from the library's users side. We just need to put implicit rules from the OldBehavior object into the correct place, such that these rules will have the lowest priority while the implicit resolution process. 

Rejected Alternatives

Honestly speaking, I have nothing to write here. I am open for your suggestions