THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with * minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10 */ HashMap<String,String> configs = new HashMap<String,String>(); class MyStreamTaskFactory implements StreamTaskFactory { @Override public StreamTask createInstance() { return (envelope, collector, coordinator) -> { return envelope * 10; }; } } } StreamDescriptor.Input input = StreamDescriptor.<String, String>input("input") .withKeySerde(new StringSerde("UTF-8")) .withMsgSerde(new StringSerde("UTF-8")); StreamDescriptor.Output output = StreamDescriptor.<String, String>output("output") .withKeySerde(new StringSerde("UTF-8")) .withMsgSerde(new StringSerde("UTF-8")); // InMemorySystem maintains system and stream config InMemoryTestSystem testingSystem = InMemoryTestSystem.create("samza-test") .addInput(input, {1,2,3,4,5}) .addOutput(output); StreamTaskApplication app = StreamApplications.createStreamTaskApp(config, new MyStreamTaskFactory()); app .addInputs(input) .addOutputs(output) .runAsTest // JOB Specific Config configs.put(JobConfig.JOB_NAME(), JOB_NAME); // Default Single Container configs configs.putIfAbsent(JobConfig.PROCESSOR_ID(), "1"); configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.putIfAbsent(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); StreamTaskApplication app = StreamApplications.createStreamTaskApp(new MapConfig(configs), new MyStreamTaskFactory()); app.addInputs(Collections.singletonList(input)).addOutputs(Collections.singletonList(output)).run(); app.waitForFinish(); // Assertions on the outputs Assert.assertThat(testingSystem.getStreamState(output), IsIterableContainingInOrder.contains({10,20,30,40,50}))); |
...