THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/
class MyStreamTaskFactory implements StreamTaskFactory {
@Override
public StreamTask createInstance() {
return (envelope, collector, coordinator) -> {
return;
};
}
}
}
StreamDescriptor.Input input = StreamDescriptor.<String, String>input("input")
.withKeySerde(new StringSerde("UTF-8"))
.withMsgSerde(new StringSerde("UTF-8"));
StreamDescriptor.Output output = StreamDescriptor.<String, String>output("output")
.withKeySerde(new StringSerde("UTF-8"))
.withMsgSerde(new StringSerde("UTF-8"));
InMemoryTestSystem testingSystem = InMemoryTestSystem.create("samza-test")
.addInput(input, {1,2,3,4,5})
.addOutput("output"/);
StreamTaskApplication app = StreamApplications.createStreamTaskApp(config, new MyStreamTaskFactory());
app
.addInputs(input)
.addOutputs(output)
.runAsTest();
// Assertions on the outputs
Assert.assertThat(testingSystem.getStreamState(output), IsIterableContainingInOrder.contains({10,20,30,40,50})));
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application in the async mode */ // public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask { private Client client; private WebTarget target; @Override public void init(Config config, TaskContext taskContext) throws Exception { // Your initialization of web client code goes here client = ClientBuilder.newClient(); target = client.target("http://example.com/resource/").path("hello"); } @Override public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, final TaskCallback callback) { target.request().async().get(new InvocationCallback<Response>() { @Override public void completed(Response response) { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10)); callback.complete(); } @Override public void failed(Throwable throwable) { System.out.println("Invocation failed."); callback.failure(throwable); } }); } @Override public void close() throws Exception { client.close(); } } // Initialize and run the test framework TestRunner .of(new AsyncRestTask()) .setTaskCallBackTimeoutMS(200) .setTaskMaxConcurrency(4) .addInputStream(CollectionStream.of("test-samza.Input", {1,2,3,4})) .addOutputStream(CollectionStream.empty("test-samza.output")) .run(); // Assertions on the outputs StreamAssert.that("test-samza.output").containsInAnyOrder({10,20,30,40}); |
...