Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Status

Current state:  "Under Discussion" Subsumed by KIP-365

Discussion thread: here

Github PR: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Serdes in the Scala wrapper are usually passed in implicit so this is a proposal to make the same on Materialized, Serialized, Consumed and Produced constructor functions.

Public Interfaces

I've proposed when Kafka Stream's Scala API was not released yet to solve the serdes issue on functions that takes a materialized (count, reduce and aggregate) to just make the materialized implicit and have a default implicit in ImplicitConversions that had the serdes filled in.

Given the following definitions:
defcount()(implicitmaterialized: Materialized[K, Long, ByteArrayKeyValueStore])
defreduce(reducer: (V, V) =>V)(implicitmaterialized: Materialized[K, V, ByteArrayKeyValueStore])
defaggregate[VR](initializer: =>VR)(aggregator: (K, V, VR) =>VR)(implicitmaterialized: Materialized[K, VR, ByteArrayKeyValueStore])

We can call them without Materialized:
groupedStream.count()

Or with a materialized:
groupedStream.count()(Materialized.as("store-name"))

By doing that we solved the case when no Materialized is given, setting implicitly the serdes to avoid runtime errors but still allowing the ability to give explicitly our own Materialized.


The issue with this solution are the followings:

  • Making Materialized implicit suggests that a Materialized is a context object whereas it's not.
  • The user of the API will by default just not care about implicit parameters and assume that they are provided.
  • The user of the API might declare it's own Materialized in an implicit val since the parameter is implicit and then be caught by other unwanted functions that also takes Materialized as implicit.
  • This a long shot view but in Scala 3 giving implicits explicitly looks like:
    groupedStream.count().explicitly(Materialized.as("store-name"))
    So it's clear that implicits should not be used as a defaulting system.

All of that applies to Consumed and Produced too.

Public Interfaces

org.apache.kafka.streams.scala.kstream.KGroupedStream.count()Introduction of:
org.apache.kafka.streams.scala.kstream.KGroupedStream.reduce()
org.apache.kafka.streams.scala.

...

kstream.KGroupedStream.aggregate()
org.apache.kafka.streams.scala.kstream.

...

KGroupedTable.count()
org.apache.kafka.streams.scala.kstream.KGroupedTable.reduce()
org.apache.kafka.streams.scala.kstream.KGroupedTable.aggregate()
org.apache.kafka.streams.scala.ImplicitConversions.materializedFromSerde()
org.apache.kafka.streams.scala.StreamsBuilder.

...

stream()
org.apache.kafka.streams.

...

scala.StreamsBuilder.table()
org.apache.kafka.streams.scala.StreamsBuilder.

...

Proposed Changes

globalTable()

Proposed Changes

def count()(implicit keySerdeSerde[K], valueSerdeSerde[V])
def count(implicit materializedMaterialized[KLongByteArrayKeyValueStore])

def reduce(reducer: (VV=> V)(implicit keySerdeSerde[K], valueSerdeSerde[V])
def reduce(reducer: (VV=> V, materializedMaterialized[KVByteArrayKeyValueStore])


def aggregate[VR](initializer=> VR)(aggregator: (KVVR=> VR)(implicit keySerdeSerde[K], valueSerdeSerde[V])
def aggregate[VR](materializedMaterialized[KVRByteArrayKeyValueStore], initializer=> VR)(aggregator: (KVVR=> VR)

defstream[K, V](topic: String)(implicit keySerdeSerde[K], valueSerdeSerde[V])
defstream[K, V](topic: Stringconsumed: Consumed[K, V])

deftable[K, V](topic: String)(implicit keySerdeSerde[K], valueSerdeSerde[V])
deftable[K, V](topic: Stringconsumed: Consumed[K, V])

defglobalTable[K, V](topic: String)(implicit keySerdeSerde[K], valueSerdeSerde[V])
defglobalTable[K, V](topic: Stringconsumed: Consumed[K, V])

def through(topicString)(implicit keySerdeSerde[K], valueSerdeSerde[V])
def through(topicStringproducedProduced[KV])

def to(topicString)(implicit keySerdeSerde[K], valueSerdeSerde[V])
def to(topicStringproducedProduced[KV])

def to(extractorTopicNameExtractor[KV])(implicit keySerdeSerde[K], valueSerdeSerde[V])
def to(extractorTopicNameExtractor[KV], producedProduced[KV])

This way we require only implicit Serdes if the Materialized (or Consumed, or Produced) is not given explicitlyAdd the scala versions of Materialized, Serialized, Consumed and Produced.

Compatibility, Deprecation, and Migration Plan

NATBD

Rejected Alternatives

NA