Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEclipse
titleSimple StreamTask with StreamDescriptors
/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with 
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/

HashMap<String,String> configs = new HashMap<String,String>();
 
class MyStreamTaskFactory implements StreamTaskFactory {
    @Override
    public StreamTask createInstance() {
      return (envelope, collector, coordinator) -> {
        return envelope * 10;
      };
    }
  }
}

StreamDescriptor.Input input = StreamDescriptor.<String, String>input("input")
    .withKeySerde(new StringSerde("UTF-8"))
    .withMsgSerde(new StringSerde("UTF-8"));

StreamDescriptor.Output output = StreamDescriptor.<String, String>output("output")
    .withKeySerde(new StringSerde("UTF-8"))
    .withMsgSerde(new StringSerde("UTF-8"));

// InMemorySystem maintains system and stream config
InMemoryTestSystem testingSystem = InMemoryTestSystem.create("samza-test")
    .addInput(input, {1,2,3,4,5})
    .addOutput(output);

StreamTaskApplication app = StreamApplications.createStreamTaskApp(config, new MyStreamTaskFactory());


app
  .addInputs(input)
  .addOutputs(output)
  .runAsTest 
// JOB Specific Config
configs.put(JobConfig.JOB_NAME(), JOB_NAME);


// Default Single Container configs
configs.putIfAbsent(JobConfig.PROCESSOR_ID(), "1");
configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.putIfAbsent(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());


 
StreamTaskApplication app = StreamApplications.createStreamTaskApp(new MapConfig(configs), new MyStreamTaskFactory());
app.addInputs(Collections.singletonList(input)).addOutputs(Collections.singletonList(output)).run();
app.waitForFinish();
 
// Assertions on the outputs
Assert.assertThat(testingSystem.getStreamState(output), IsIterableContainingInOrder.contains({10,20,30,40,50})));

...