/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/
//
public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
private Client client;
private WebTarget target;
@Override
public void init(Config config, TaskContext taskContext) throws Exception {
// Your initialization of web client code goes here
client = ClientBuilder.newClient();
target = client.target("http://example.com/resource/").path("hello");
}
@Override
public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
TaskCoordinator coordinator, final TaskCallback callback) {
target.request().async().get(new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
callback.complete();
}
@Override
public void failed(Throwable throwable) {
System.out.println("Invocation failed.");
callback.failure(throwable);
}
});
}
@Override
public void close() throws Exception {
client.close();
}
}
// Initialize and run the test framework
TestRunner
.of(new AsyncRestTask())
.setTaskCallBackTimeoutMS(200)
.setTaskMaxConcurrency(4)
.addInputStream(CollectionStream.of("test-samza.Input", {1,2,3,4}))
.addOutputStream(CollectionStream.empty("test-samza.Outputoutput"))
.run();
// Assertions on the outputs
StreamAssert.that("test-samza.Outputoutput").containsInAnyOrder({10,20,30,40});
|