Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEclipse
titleSimple StreamTask with StreamDescriptors
/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with 
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/
 
class MyStreamTaskFactory implements StreamTaskFactory {
    @Override
    public StreamTask createInstance() {
      return (envelope, collector, coordinator) -> {
        return;
      };
    }
  }
}

StreamDescriptor.Input input = StreamDescriptor.<String, String>input("input")
    .withKeySerde(new StringSerde("UTF-8"))
    .withMsgSerde(new StringSerde("UTF-8"));

StreamDescriptor.Output output = StreamDescriptor.<String, String>output("output")
    .withKeySerde(new StringSerde("UTF-8"))
    .withMsgSerde(new StringSerde("UTF-8"));

InMemoryTestSystem testingSystem = InMemoryTestSystem.create("samza-test")
    .addInput(input, {1,2,3,4,5})
    .addOutput("output"/);

StreamTaskApplication app = StreamApplications.createStreamTaskApp(config, new MyStreamTaskFactory());


app
  .addInputs(input)
  .addOutputs(output)
  .runAsTest();
 
// Assertions on the outputs
Assert.assertThat(testingSystem.getStreamState(output), IsIterableContainingInOrder.contains({10,20,30,40,50})));



Code Block
languagejava
themeEclipse
titleSimple AsyncStreamTask Test
/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/

// 
public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target; 

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    // Your initialization of web client code goes here
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
      TaskCoordinator coordinator, final TaskCallback callback) {
    target.request().async().get(new InvocationCallback<Response>() {
      @Override
      public void completed(Response response) {
        Integer obj = (Integer) envelope.getMessage();
        collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
        callback.complete();
      }

      @Override
      public void failed(Throwable throwable) {
        System.out.println("Invocation failed.");
        callback.failure(throwable);
      }
    });
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}
 

// Initialize and run the test framework
TestRunner
    .of(new AsyncRestTask())
	.setTaskCallBackTimeoutMS(200)
	.setTaskMaxConcurrency(4)
	.addInputStream(CollectionStream.of("test-samza.Input", {1,2,3,4}))
	.addOutputStream(CollectionStream.empty("test-samza.output"))
	.run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.output").containsInAnyOrder({10,20,30,40});

...