THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
Status
Current state: UNDER DISCUSSIONPartially Accepted. Superseded by SEP-13.
Discussion thread: http://mail-archives.apache.org/mod_mbox/samza-dev/201704.mbox/%3CCAFvExu0UptjFJnHtbbVsHteM3gfsZKHMPq%2BVrLjWsPmmjPunzw%40mail.gmail.com%3E
...
Code Block |
---|
/** * Example code to implement window-based counter with a re-partition stage */ public class PageViewCounterExample implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream<PageViewEvent> pageViewEvents = graph.createInputStream(“myinput”); MessageStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutputStream(“myoutput”); pageViewEvents. partitionBy(m -> m.getMessage().memberId). window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1). setEarlyTrigger(Triggers.repeat(Triggers.count(5))). setAccumulationMode(AccumulationMode.DISCARDING)). map(MyStreamOutput::new). sendTo(pageViewPerMemberCounters); } public static void main(String[] args) throws Exception { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config); localRunner.run(new PageViewCounterExample()); } } |
Table 1: Repartition counter example using new fluent API
...
Code Block |
---|
/** * Interface to be implemented by any environment that supports physical input/output streams and local stores * */ public interface RuntimeEnvironment { /** * Method to get the physical {@link org.apache.samza.system.StreamSpec} to describe the stream with ID {@code streamId} */ StreamSpec getStreamSpec(String streamId); /** * Method to get the physical {@link org.apache.samza.system.SystemFactory} with the name {@code system} */ SystemFactory getSystemFactory(String system); /** * Method to get the physical {@link org.apache.samza.storage.StorageEngineFactory} for {@link org.apache.samza.storage.kv.KeyValueStore} * with the name {@code storeId} */ StorageEngineFactory getStorageEngineFactory(String storeId); } |
TestRuntimeEnvironment (implementation class)
Code Block |
---|
public class TestRuntimeEnvironment implements RuntimeEnvironment { public StreamSpec getStreamSpec(String streamId) {...} public SystemFactory getSystemFactory(String system) {...} public StorageEngineFactory getStorageEngineFactory(String storeId) {...} /** * Additional methods to add Java collection based input/output streams */ <K,V> public void addStream(String streamId, Queue<Entry<K, V>> messages) {...} /** * Additional methods to add Java collection based KV-store */ <K,V> public void addStore(String storeId, Map<K, V> storeEntries) {...} } |
...