THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with * minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10 */ // Create a StreamTask MyStreamTestTask myTask = new StreamTask() { @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test","output"), obj*10)); } }; CollectionStream<Integer> input = CollectionStream .of("test", "input", {1,2,3,4}) .from(system); CollectionStream output = CollectionStream .empty("test", "output") .from(system); TestRunner .of(MyStreamTestTask.class) .addInputStream(input) .addOutputStream(output) .run(); // Assertions on the outputs Assert.assertThat(TestRunner.consumeStream(output), IsIterableContainingInOrder.contains({10,20,30,40}))); |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application in the async mode */ // public class MyAsyncStreamTask implements AsyncStreamTask, InitableTask, ClosableTask { private Client client; private WebTarget target; @Override public void init(Config config, TaskContext taskContext) throws Exception { // Your initialization of web client code goes here client = ClientBuilder.newClient(); target = client.target("http://example.com/resource/").path("hello"); } @Override public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, final TaskCallback callback) { target.request().async().get(new InvocationCallback<Response>() { @Override public void completed(Response response) { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test","output"), obj*10)); callback.complete(); } @Override public void failed(Throwable throwable) { System.out.println("Invocation failed."); callback.failure(throwable); } }); } @Override public void close() throws Exception { client.close(); } } CollectionStream<Integer> input = CollectionStream .of("test", "input", {1,2,3,4}) .from(system); CollectionStream output = CollectionStream .empty("test", "output") .from(system); TestRunner .of(MyAsyncStreamTask.class) .addInputStream(input) .addOutputStream(output) .run(); // Assertions on the outputs StreamAssert.that(output).containsInAnyOrder({10,20,30,40}); |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a High level application */ public class MyStreamApplication implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream<Integer> pageViews = graph.getInputStream(“test.page-views”); pageViews.map(s -> "processed " + s) .sendTo(graph.getOutputStream(“test.output”)); } } CollectionStream<Integer> input = CollectionStream .of("test", "input", {1,2,3,4}) .from(system); CollectionStream output = CollectionStream .empty("test", "output") .from(system); // Initialize and run the test framework TestRunner .of(new MyStreamApplication()); .addInputStream(input) .addOutputStream(output) .addOverrideConfig("job.default.system", "test") .run(); // Assertions on the outputs StreamAssert.that(output).contains({"processed 1", "processed 2", "processed 4"}); |
...