Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Vote Passed

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-4720

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

KStream looks a lot like java.util.stream.Stream; it includes methods such as filter and map, terminal operations like to or join, and so on.  One conspicuously absent method is peek.e

Most stream operations are expected to be pure functions.  peek, by its nature, is not.  Usually mutating external state is discouraged but a number of diagnostic activities are made much easier.

...

Code Block
languagejava
KStream<String> words = someWordsWeWantToCount(); // since this is what basically every kafka application does, right? :)
wordCountswords.filter((w, v) -> w.startsWith("a"))
          .peek(w -> metrics.wordsStartingWithAProcessed.increment())
          .peek(System.out::println)
     .groupByKey()
     .groupByKey().count();

 

In this particular case, we increment a metric counter (understanding that in failure cases some records may be double-counted due to retried work) and print every word observed out to System.out.  This provides useful information about the processing of the stream as opposed to the contents of the stream, which can be very useful when experimenting with stateful processors or otherwise diagnosing things that went wrong.

...