Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: UNDER DISCUSSIONPartially Accepted. Superseded by SEP-13.

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/201704.mbox/%3CCAFvExu0UptjFJnHtbbVsHteM3gfsZKHMPq%2BVrLjWsPmmjPunzw%40mail.gmail.com%3E

...

 

Code Block
/**
* Example code to implement window-based counter with a re-partition stage
*/
public class PageViewCounterExample implements StreamApplication {

 @Override public void init(StreamGraph graph, Config config) {
   MessageStream<PageViewEvent> pageViewEvents = graph.createInputStream(“myinput”);
   MessageStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutputStream(“myoutput”);

   pageViewEvents.
       partitionBy(m -> m.getMessage().memberId).
       window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1).
           setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
           setAccumulationMode(AccumulationMode.DISCARDING)).
       map(MyStreamOutput::new).
       sendTo(pageViewPerMemberCounters);

 }

 public static void main(String[] args) throws Exception {
   CommandLine cmdLine = new CommandLine();
   Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
   ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
   localRunner.run(new PageViewCounterExample());
 }
}

 

Table 1: Repartition counter example using new fluent API

...

Code Block
/**
 * Interface to be implemented by any environment that supports physical input/output streams and local stores
 *
 */
public interface RuntimeEnvironment {

  /**
   * Method to get the physical {@link org.apache.samza.system.StreamSpec} to describe the stream with ID {@code streamId}
   */
  StreamSpec getStreamSpec(String streamId);

  /**
   * Method to get the physical {@link org.apache.samza.system.SystemFactory} with the name {@code system}
   */
  SystemFactory getSystemFactory(String system);

  /**
   * Method to get the physical {@link org.apache.samza.storage.StorageEngineFactory} for {@link org.apache.samza.storage.kv.KeyValueStore} 
   * with the name {@code storeId}
   */
  StorageEngineFactory getStorageEngineFactory(String storeId);

}

 

TestRuntimeEnvironment (implementation class)

 

Code Block
public class TestRuntimeEnvironment implements RuntimeEnvironment {
  public StreamSpec getStreamSpec(String streamId) {...}
  public SystemFactory getSystemFactory(String system) {...}
  public StorageEngineFactory getStorageEngineFactory(String storeId) {...}
 
  /**
   * Additional methods to add Java collection based input/output streams
   */
  <K,V> public void addStream(String streamId, Queue<Entry<K, V>> messages) {...}
 
  /**
   * Additional methods to add Java collection based KV-store
   */
  <K,V> public void addStore(String storeId, Map<K, V> storeEntries) {...}
 
}

...