Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: UNDER DISCUSSIONPartially Accepted. Superseded by SEP-13.

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/201704.mbox/%3CCAFvExu0UptjFJnHtbbVsHteM3gfsZKHMPq%2BVrLjWsPmmjPunzw%40mail.gmail.com%3E

...

 

Code Block
/**
* Example code to implement window-based counter with a re-partition stage
*/
public class PageViewCounterExample implements StreamApplication {

 @Override public void init(StreamGraph graph, Config config) {
   MessageStream<PageViewEvent> pageViewEvents = graph.createInputStream(“myinput”);
   MessageStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutputStream(“myoutput”);

   pageViewEvents.
       partitionBy(m -> m.getMessage().memberId).
       window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1).
           setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
           setAccumulationMode(AccumulationMode.DISCARDING)).
       map(MyStreamOutput::new).
       sendTo(pageViewPerMemberCounters);

 }

 public static void main(String[] args) throws Exception {
   CommandLine cmdLine = new CommandLine();
   Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
   ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
   localRunner.run(new PageViewCounterExample());
 }
}

 

Table 1: Repartition counter example using new fluent API

...