/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/
//
public class MyAsyncStreamTask implements AsyncStreamTask, InitableTask, ClosableTask {
private Client client;
private WebTarget target;
@Override
public void init(Config config, TaskContext taskContext) throws Exception {
// Your initialization of web client code goes here
client = ClientBuilder.newClient();
target = client.target("http://example.com/resource/").path("hello");
}
@Override
public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
TaskCoordinator coordinator, final TaskCallback callback) {
target.request().async().get(new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
callback.complete();
}
@Override
public void failed(Throwable throwable) {
System.out.println("Invocation failed.");
callback.failure(throwable);
}
});
}
@Override
public void close() throws Exception {
client.close();
}
}
CollectionStream<Integer> input = CollectionStream
.of("test", "input", {1,2,3,4})
.from(system);
CollectionStream output = CollectionStream
.empty("test", "output")
.from(system);
TestRunner
.of(MyAsyncStreamTask.class)
.addInputStream(input)
.addOutputStream(output)
.run();
// Assertions on the outputs
StreamAssert.that(output).containsInAnyOrder({10,20,30,40});
|