THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
public class TestPageViewCountExample { private PageViewCounterExample userApp = new PageViewCounterExample(); Queue<Entry<String, PageViewCounterExample.PageViewEvent>> inputMessages = new LinkedBlockingQueue<>(); Queue<Entry<String, PageViewCounterExample.PageViewCount>> outgoingMessages = new LinkedBlockingQueue<>(); TestApplicationRunner testRunner; @Before public void setup() { // preparation for config, input, output, and local store Config config = mock(Config.class); testRunner = new TestApplicationRunner(new AppConfig(config)); testRunner.addStream("pageViewEventStream", inputMessages); testRunner.addStream("pageViewEventPerMemberStream", outgoingMessages); inputMessages.offer(new Entry<>("my-page-id", new PageViewCounterExample.PageViewEvent("my-page-id", "my-member-id", 1234556L))); testRunner.start(userApp); } @After public void shutdown() { testRunner.stop(userApp); } @Test public void test() throws InterruptedException { while(true) { Entry<String, PageViewCounterExample.PageViewCount> countEvent = outgoingMessages.poll(); // run the user app with bounded input stream this.testRunner.run(userApp); // validate the test results Entry<String, PageViewCounterExample.PageViewCount> countEvent = outgoingMessages.poll(); String wndKey = getWindowKey("my-member-id", 1234556L); assertTrue(countEvent.getKey().equals(wndKey)); assertTrue(countEvent.getValue().count == 1); break; } } private String getWindowKey(String s, long timestamp) { return String.format("%s-%d", s, (timestamp / 10000) * 10000); } } |
...