Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEclipse
titleSimple StreamTask with StreamDescriptors
/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with 
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/
 
class MyStreamTaskFactory implements StreamTaskFactory {
    @Override
    public StreamTask createInstance() {
      return (envelope, collector, coordinator) -> {
        return;
      };
    }
  }
}

StreamDescriptor.Input input = StreamDescriptor.<String, String>input("input")
    .withKeySerde(new StringSerde("UTF-8"))
    .withMsgSerde(new StringSerde("UTF-8"));

StreamDescriptor.Output output = StreamDescriptor.<String, String>output("output")
    .withKeySerde(new StringSerde("UTF-8"))
    .withMsgSerde(new StringSerde("UTF-8"));

InMemoryTestSystem testingSystem = InMemoryTestSystem.create("samza-test")
    .addInput(input, {1,2,3,4,5})
    .addOutput("output"/);

StreamTaskApplication app = StreamApplications.createStreamTaskApp(config, new MyStreamTaskFactory());


app
  .addInputs(input)
  .addOutputs(output)
  .runAsTest();
 
// Assertions on the outputs
Assert.assertThat(testingSystem.getStreamState(output), IsIterableContainingInOrder.contains({10,20,30,40,50})));

...