Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEclipse
titleSimple StreamTask Test
/**
* SampleSimple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with 
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies 
* each integer with 10
*/
 
// Create a StreamTask
StreamTask myTask = new StreamTask() {
  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
    Integer obj = (Integer) envelope.getMessage();
    collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
  }
};
 
// Initialize and run the test framework
TestTask
    .create(systemName: "test-samza", myTask, config, mode.SINGLE_CONTAINER)
    .setJobContainerThreadPoolSize(4)
    .addInputStream(CollectionStream.of(streamName: "input", {1,2,3,4}))
    .addOutputStream(CollectionStream.empty(streamName: "output"))
    .run();
 
// Assertions on the outputs
TaskAssert.that("test-samza", "Output").containsInAnyOrder({10,30,20,40});
Code Block
languagejava
themeEclipse
titleSimple AsyncStreamTask Test
/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/

// 
public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target; 

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    // Your initialization of web client code goes here
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
      TaskCoordinator coordinator, final TaskCallback callback) {
    target.request().async().get(new InvocationCallback<Response>() {
      @Override
      public void completed(Response response) {
        // process your message
        callback.complete();
      }

      @Override
      public void failed(Throwable throwable) {
        System.out.println("Invocation failed.");
        callback.failure(throwable);
      }
    });
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}
 

// Initialize and run the test framework
TestTask
    .create("test-samza", new AsyncRestTask(), config, Mode.SINGLE_CONTAINER)
	.setTaskCallBackTimeoutMS(200)
	.setTaskMaxConcurrency(4)
	.addInputStream(CollectionStream.of("PageView", pageviews))
	.addOutputStream(CollectionStream.empty("Output"))
	.run();
 
// Assertions on the outputs
TaskAssert.that("test-samza", "Output").containsInAnyOrder(pageviews);
Code Block
languagejava
themeEclipse
titleSimple High Level Api Test
 
// Initialize and run the test framework
TestApplication
	.create(config, mode.SINGLE_CONTAINER);
    .getInputStream(CollectionStream.of(streamName: "input", {1,2,3}))
    .map(s -> "processed " + s)
    .run();
 
// Assertions on the outputs
StreamAssert.that(stream).containsInAnyOrder(Arrays.asList("processed 1", "processed 2", "processed 4"));

...