/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/
// Create a StreamTask
StreamTask myTask = new StreamTask() {
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
}
};
// Initialize and run the test framework
TestTask
.create(systemName: "test-samza", myTask, config, mode.SINGLE_CONTAINER)
.setJobContainerThreadPoolSize(4)
.addInputStream(CollectionStream.of(streamName: "input", {1,2,3,4}))
.addOutputStream(CollectionStream.empty(streamName: "output"))
.run();
// Assertions on the outputs
TaskAssert.that("test-samza", "Output").containsInAnyOrder({10,30,20,40}); |