Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagescala
val kafka_streams_scala_version = "0.1.2"
libraryDependencies ++= Seq("com.lightbend" %% "kafka-streams-scala" % kafka_streams_scala_version)

Maven

Code Block
languagexml
<dependency>
 <groupId>com.lightbend</groupId>
 <artifactId>kafka-streams-scala_2.12</artifactId>
 <version>0.1.2</version>
</dependency>

...

Code Block
languagegroovy
compile 'com.lightbend:kafka-streams-scala_2.12:0.1.2'

Dependencies

kafka-streams-scala only depends on the Scala standard library and Kafka Streams 1.0.0.

Sample Usage

The library works by wrapping the original Java abstractions of Kafka Streams within a Scala wrapper object and then using implicit conversions between them. All the Scala abstractions are named with an S appended to the name of the corresponding Java abstraction e.g. StreamBuilderS is a wrapper around StreamBuilder, KStreamS is a wrapper around KStream.

Here's an example of the classic Word Count program that uses the Scala builder StreamBuilderS (a wrapper around StreamBuilder) and then builds an instance of KStreamS (a wrapper around KStream) using the wrapped API builder.stream. Then we reify to a table and get a KTableS, which, again is a wrapper around KTable.

The net result is that the following code is structured just like using the Java API, but from Scala and with far fewer type annotations compared to using the Java API directly from Scala. The difference in type annotation usage will be more obvious when we use a more complicated example. The library comes with a test suite of a few examples that demonstrate these capabilities.

 

Code Block
languagescala
titleWord Count
import DefaultSerdes._
 
val builder = new StreamsBuilderS
val textLines = builder.stream[String, String](inputTopic)

val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)

val wordCounts: KTableS[String, Long] =

 textLines.flatMapValues(v => pattern.split(v.toLowerCase))
   .groupBy((k, v) => v)
   .count()
wordCounts.toStream.to(outputTopic)

val streams = new KafkaStreams(builder.build, streamsConfiguration)

streams.start()

In the above code snippet, we don't have to provide any serdes, Serialized, Produced, Consumed or Joined explicitly. They will also not be dependent on any serdes specified in the config - in fact all serdes specified in the config will be ignored by the Scala APIs. All serdes and Serialized, Produced, Consumed or Joined will be handled through implicit serdes as discussed later in the document. The complete independence from configuration based serdes is what makes this library completely type-safe - any missing instances of serdes, Serialized, Produced, Consumed or Joined will be flagged as a compile time error.

 
Type Inference and Composition
 

Here's a sample code fragment using the Scala wrapper library. Compare this example to the Scala code for the same example using the Java API directly in Confluent's repository.

 
Code Block
languagescala
titleBetter type inference
// Compute the total per region by summing the individual click counts per region.
val clicksPerRegion: KTableS[String, Long] =
 userClicksStream
   // Join the stream against the table
   .leftJoin(userRegionsTable,
     (clicks: Long, region: String) =>
       (if (region == null) "UNKNOWN" else region, clicks))

   // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
   .map((_, regionWithClicks) => regionWithClicks)

   // Compute the total per region by summing the individual click counts per region.
   .groupByKey
   .reduce(_ + _)

Implicit Serdes

One of the common complaints of Scala users with the Java API has been the repetitive usage of the serdes in API invocations. Many of the APIs need to take the serdes through abstractions like Serialized, Consumed, Produced or Joined. And the user has to supply them every time through the with function of these classes.

The library uses the power of Scala implicits to alleviate this concern. As a user you can provide implicit serdes or implicit values of Serialized, JoinedConsumed or Produced once and make your code less verbose. In fact you can just have the implicit serdes in scope and the library will make the instances of Serialized, Produced, Consumed or Joined available in scope.

The library also bundles all implicit serdes of the commonly used primitive types in a Scala module - so just import the module vals and have all serdes in scope. Similar strategy of modular implicits can be sdopted for any user-defined serdes as well.

Here's an example:

Code Block
languagescala
titleImplicit Serdes
// DefaultSerdes brings into scope implicit serdes (mostly for primitives)
// that will set up all Serialized, Produced, Consumed and Joined instances.
// So all APIs below that accept Serialized, Produced, Consumed or Joined will
// get these instances automatically
import DefaultSerdes._

val builder = new StreamsBuilderS()

val userClicksStream: KStreamS[String, Long] = builder.stream(userClicksTopic)

val userRegionsTable: KTableS[String, String] = builder.table(userRegionsTopic)

// The following code fragment does not have a single instance of Serialized,
// Produced, Consumed or Joined supplied explicitly.
// All of them are taken care of by the implicit serdes imported by DefaultSerdes
val clicksPerRegion: KTableS[String, Long] =
  userClicksStream
    .leftJoin(userRegionsTable,
      (clicks: Long, region: String) =>
        (if (region == null) "UNKNOWN" else region, clicks))
    .map((_, regionWithClicks) => regionWithClicks)
    .groupByKey
    .reduce(_ + _)

clicksPerRegion.toStream.to(outputTopic)

Compare the code with the one that does not use implicit serdes and you will see the difference in verbosity. Also the library does not depend on serdes being supplied with the configuration that opens up a whole can of type unsafety in the mix. Instead the library expects implicit serdes to be available in scope for all the Scala APIs. For any missing serdes, it will emit compiler error which makes the library much more type-safe than the corresponding Java one.


 




Compatibility, Deprecation, and Migration Plan

...