You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state"Under Discussion"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This document outlines a proposal for developing a Scala library as a wrapper over the existing Java APIs for Kafka Streams.

Kafka Streams currently offers Java APIs based on the Builder design pattern, which allows users to incrementally build the target functionality using lower level compositional fluent APIs. The problems of using these APIs from a Scala code are 2 fold:

  1. Additional type annotations - The Java APIs use Java generics in a way that are not fully compatible with the type inferencer of the Scala compiler. Hence the user has to add type annotations to the Scala code, which seems rather non-idiomatic in Scala.

  2. Verbosity - In some cases the Java APIs appear too verbose compared to idiomatic Scala.

  3. Type Unsafety - The Java APIs offer some options where the compile time type safety is sometimes subverted and can result in runtime errors. This stems from the fact that the serdes defined as part of config are not type checked during compile time. Hence any missing serdes can result in runtime errors.

Proposed Changes

Summary

The suggested Scala library is a wrapper over the existing Java APIs for Kafka Streams DSL and addresses the above 3 concerns. It does not attempt to provide idiomatic Scala APIs that one would implement in a Scala library developed from scratch. The intention is to make the Java APIs more usable in Scala through better type inferencing, enhanced expressiveness, and lesser boilerplates.

The library wraps Java Stream DSL APIs in Scala thereby providing:

  1. Much better type inference in Scala

  2. Less boilerplate in application code

  3. The usual builder-style composition that developers get with the original Java API

  4. Implicit serializers and de-serializers leading to better abstraction and less verbosity

  5. Better type safety during compile time

The above points result in an overall improved productivity for development.

This document introduces the Kafka Streams Scala library.

In addition, we received a proposal for an alternate implementation of the same functionality using the type class based approach in Scala. This is the PR currently open in our repository and is based on a fork of our implementation. There has been lots of discussions on the pros and cons of both the approaches.

Quick Start

kafka-streams-scala is published and cross-built for Scala 2.11, and 2.12.

SBT

Add the following to your SBT build:

val kafka_streams_scala_version = "0.1.2"
libraryDependencies ++= Seq("com.lightbend" %% "kafka-streams-scala" % kafka_streams_scala_version

Maven

<dependency>
 <groupId>com.lightbend</groupId>
 <artifactId>kafka-streams-scala_2.12</artifactId>
 <version>0.1.2</version>
</dependency>

Remember to fully qualify the artifactId to match the version of Scala you’re using.

Gradle

compile 'com.lightbend:kafka-streams-scala_2.12:0.1.2'

Dependencies

kafka-streams-scala only depends on the Scala standard library and Kafka Streams 1.0.0.

Sample Usage

The library works by wrapping the original Java abstractions of Kafka Streams within a Scala wrapper object and then using implicit conversions between them. All the Scala abstractions are named with an S appended to the name of the corresponding Java abstraction e.g. StreamBuilderS is a wrapper around StreamBuilder, KStreamS is a wrapper around KStream.

Here's an example of the classic Word Count program that uses the Scala builder StreamBuilderS (a wrapper around StreamBuilder) and then builds an instance of KStreamS (a wrapper around KStream) using the wrapped API builder.stream. Then we reify to a table and get a KTableS, which, again is a wrapper around KTable.

The net result is that the following code is structured just like using the Java API, but from Scala and with far fewer type annotations compared to using the Java API directly from Scala. The difference in type annotation usage will be more obvious when we use a more complicated example. The library comes with a test suite of a few examples that demonstrate these capabilities.

 

Word Count
import DefaultSerdes._
 
val builder = new StreamsBuilderS
val textLines = builder.stream[String, String](inputTopic)

val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)

val wordCounts: KTableS[String, Long] =

 textLines.flatMapValues(v => pattern.split(v.toLowerCase))
   .groupBy((k, v) => v)
   .count()
wordCounts.toStream.to(outputTopic)

val streams = new KafkaStreams(builder.build, streamsConfiguration)

streams.start()

In the above code snippet, we don't have to provide any serdes, Serialized, Produced, Consumed or Joined explicitly. They will also not be dependent on any serdes specified in the config - in fact all serdes specified in the config will be ignored by the Scala APIs. All serdes and Serialized, Produced, Consumed or Joined will be handled through implicit serdes as discussed later in the document. The complete independence from configuration based serdes is what makes this library completely type-safe - any missing instances of serdes, Serialized, Produced, Consumed or Joined will be flagged as a compile time error.

 
Type Inference and Composition
 

Here's a sample code fragment using the Scala wrapper library. Compare this example to the Scala code for the same example using the Java API directly in Confluent's repository.

 
Better type inference
// Compute the total per region by summing the individual click counts per region.
val clicksPerRegion: KTableS[String, Long] =
 userClicksStream
   // Join the stream against the table
   .leftJoin(userRegionsTable,
     (clicks: Long, region: String) =>
       (if (region == null) "UNKNOWN" else region, clicks))

   // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
   .map((_, regionWithClicks) => regionWithClicks)

   // Compute the total per region by summing the individual click counts per region.
   .groupByKey
   .reduce(_ + _)

Implicit Serdes

One of the common complaints of Scala users with the Java API has been the repetitive usage of the serdes in API invocations. Many of the APIs need to take the serdes through abstractions like Serialized, Consumed, Produced or Joined. And the user has to supply them every time through the with function of these classes.

The library uses the power of Scala implicits to alleviate this concern. As a user you can provide implicit serdes or implicit values of Serialized, JoinedConsumed or Produced once and make your code less verbose. In fact you can just have the implicit serdes in scope and the library will make the instances of Serialized, Produced, Consumed or Joined available in scope.

The library also bundles all implicit serdes of the commonly used primitive types in a Scala module - so just import the module vals and have all serdes in scope. Similar strategy of modular implicits can be sdopted for any user-defined serdes as well.

Here's an example:

Implicit Serdes
// DefaultSerdes brings into scope implicit serdes (mostly for primitives)
// that will set up all Serialized, Produced, Consumed and Joined instances.
// So all APIs below that accept Serialized, Produced, Consumed or Joined will
// get these instances automatically
import DefaultSerdes._

val builder = new StreamsBuilderS()

val userClicksStream: KStreamS[String, Long] = builder.stream(userClicksTopic)

val userRegionsTable: KTableS[String, String] = builder.table(userRegionsTopic)

// The following code fragment does not have a single instance of Serialized,
// Produced, Consumed or Joined supplied explicitly.
// All of them are taken care of by the implicit serdes imported by DefaultSerdes
val clicksPerRegion: KTableS[String, Long] =
  userClicksStream
    .leftJoin(userRegionsTable,
      (clicks: Long, region: String) =>
        (if (region == null) "UNKNOWN" else region, clicks))
    .map((_, regionWithClicks) => regionWithClicks)
    .groupByKey
    .reduce(_ + _)

clicksPerRegion.toStream.to(outputTopic)

Compare the code with the one that does not use implicit serdes and you will see the difference in verbosity. Also the library does not depend on serdes being supplied with the configuration that opens up a whole can of type unsafety in the mix. Instead the library expects implicit serdes to be available in scope for all the Scala APIs. For any missing serdes, it will emit compiler error which makes the library much more type-safe than the corresponding Java one.


 




Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels