THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with * minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10 */ class MyStreamTaskFactory implements StreamTaskFactory { @Override public StreamTask createInstance() { return (envelope, collector, coordinator) -> { return; }; } } } StreamDescriptor.Input input = StreamDescriptor.<String, String>input("input") .withKeySerde(new StringSerde("UTF-8")) .withMsgSerde(new StringSerde("UTF-8")); StreamDescriptor.Output output = StreamDescriptor.<String, String>output("output") .withKeySerde(new StringSerde("UTF-8")) .withMsgSerde(new StringSerde("UTF-8")); InMemoryTestSystem testingSystem = InMemoryTestSystem.create("samza-test") .addInput(input, {1,2,3,4,5}) .addOutput("output"/); StreamTaskApplication app = StreamApplications.createStreamTaskApp(config, new MyStreamTaskFactory()); app .addInputs(input) .addOutputs(output) .runAsTest(); // Assertions on the outputs Assert.assertThat(testingSystem.getStreamState(output), IsIterableContainingInOrder.contains({10,20,30,40,50}))); |
...