Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Making Materialized implicit suggests that a Materialized is a context object whereas it's not.
  • The user of the API will by default just not care about implicit parameters and assume that they are provided.
  • The user of the API might declare it's own Materialized in an implicit val since the parameter is implicit and then be caught by other unwanted functions that also takes Materialized as implicit.
  • This a long shot view but in Scala 3 giving implicits explicitly looks like:
    groupedStream.count().explicitly(Materialized.as("store-name"))
    So it's clear tha

All of that applies to Consumed and Produced too.

...

org.apache.kafka.streams.scala.kstream.KGroupedStream.count()
org.apache.kafka.streams.scala.kstream.KGroupedStream.reduce()
org.apache.kafka.streams.scala.kstream.KGroupedStream.aggregate()
org.apache.kafka.streams.scala.kstream.KGroupedTable.count()
org.apache.kafka.streams.scala.kstream.KGroupedTable.reduce()
org.apache.kafka.streams.scala.kstream.KGroupedTable.aggregate()
org.apache.kafka.streams.scala.ImplicitConversions.materializedFromSerde()
org.apache.kafka.streams.scala.StreamsBuilder.stream()
org.apache.kafka.streams.scala.StreamsBuilder.table()
org.apache.kafka.streams.scala.StreamsBuilder.globalTable()

Proposed Changes

def count()(implicit keySerdeSerde[K], valueSerdeSerde[V])
def count(implicit materializedMaterialized[KLongByteArrayKeyValueStore])

def reduce(reducer: (VV=> V)(implicit keySerdeSerde[K], valueSerdeSerde[V])
def reduce(reducer: (VV=> V, materializedMaterialized[KVByteArrayKeyValueStore])


def aggregate[VR](initializer=> VR)(aggregator: (KVVR=> VR)(implicit keySerdeSerde[K], valueSerdeSerde[V])
def aggregate[VR](materializedMaterialized[KVRByteArrayKeyValueStore], initializer=> VR)(aggregator: (KVVR=> VR)

defstream[K, V](topic: String)(implicit keySerdeSerde[K], valueSerdeSerde[V])
defstream[K, V](topic: Stringconsumed: Consumed[K, V])

deftable[K, V](topic: String)(implicit keySerdeSerde[K], valueSerdeSerde[V])
deftable[K, V](topic: Stringconsumed: Consumed[K, V])

defglobalTable[K, V](topic: String)(implicit keySerdeSerde[K], valueSerdeSerde[V])
defglobalTable[K, V](topic: Stringconsumed: Consumed[K, V])

def through(topicString)(implicit keySerdeSerde[K], valueSerdeSerde[V])
def through(topicStringproducedProduced[KV])

def to(topicString)(implicit keySerdeSerde[K], valueSerdeSerde[V])
def to(topicStringproducedProduced[KV])

def to(extractorTopicNameExtractor[KV])(implicit keySerdeSerde[K], valueSerdeSerde[V])
def to(extractorTopicNameExtractor[KV], producedProduced[KV])

This way we require only the Serdes if the Materialized (or Consumed, or Produced) is not given explicitly.TBD

Compatibility, Deprecation, and Migration Plan

...