...
The library works by wrapping the original Java abstractions of Kafka Streams within a Scala wrapper object and then using implicit conversions between them. All the Scala abstractions are named identically as the corresponding Java abstraction but they reside in a different package of the library e.g. the Scala class org.apache.kafka.streams.scala.StreamsBuilder
is a wrapper around org.apache.kafka.streams.StreamBuilder
, org.apache.kafka.streams.scala.kstream.KStream
is a wrapper around org.apache.kafka.streams.kstream.KStream
.
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.scala import org.apache.kafka.streams.scala.kstream._ // brings in scope all necessary implicit serdes import DefaultSerdes._ val builder = new StreamsBuilder val textLines = builder.stream[String, String](inputTopic) val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS) val wordCounts: KTable[String, Long] = textLines.flatMapValues(v => pattern.split(v.toLowerCase)) .groupBy((k, v) => v) .count() wordCounts.toStream.to(outputTopic) val streams = new KafkaStreams(builder.build, streamsConfiguration) streams.start() |
...