Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The library works by wrapping the original Java abstractions of Kafka Streams within a Scala wrapper object and then using implicit conversions between them. All the Scala abstractions are named identically as the corresponding Java abstraction but they reside in a different package of the library e.g. the Scala class org.apache.kafka.streams.scala.StreamsBuilder is a wrapper around org.apache.kafka.streams.StreamBuilder, org.apache.kafka.streams.scala.kstream.KStream is a wrapper around org.apache.kafka.streams.kstream.KStream.

...

Code Block
languagescala
titleWord Count
package org.apache.kafka.streams.scala
 
import org.apache.kafka.streams.scala.kstream._
// brings in scope all necessary implicit serdes
import DefaultSerdes._
 
val builder = new StreamsBuilder
val textLines = builder.stream[String, String](inputTopic)

val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)

val wordCounts: KTable[String, Long] =

 textLines.flatMapValues(v => pattern.split(v.toLowerCase))
   .groupBy((k, v) => v)
   .count()
wordCounts.toStream.to(outputTopic)

val streams = new KafkaStreams(builder.build, streamsConfiguration)

streams.start()

...