THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestRunner provides static factory for quick setup of Samza environment for testing Low level api and High level api , users can configure input streams * they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask/AsyncStreamTask/StreamApplication */ public class TestRunner { // Static Factory to config & create runner for low level api public static TestRunner of(Class taskClass) {...} // Static Factory to config & create runner for high level api public static TestRunner of(StreamApplication app) {...} // Add/Ovveride any custom configs public TestRunner addConfigs(Map<String,String> configs) {...} public TestRunner addConfigs(Config configs) {...} public TestRunner addOverrideConfig(String key, String val){...} // Configure state for application public TestRunner addState(String storeName) {...} // Configure an input stream for samza system, that job can consume from public TestRunner addInputStream(CollectionStream stream) {...} // Configures an output stream for samza system, that job can producer to public TestRunner addOutputStream(CollectionStream stream) {...} // Consume messages from a Stream public static <T> Map<Integer, List<T>> consumeStream(CollectionStream stream, Integer timeout) {...} // Run the TestRunner public void run() {...} } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * CollectionStream acts a descriptor that can be used to build an in memory input stream (single/multiple partition) of java collections */ public class CollectionStream<T> { // Create an empty stream with single partition that a Samza job can produce to public static <T> CollectionStream<T> empty(String systemName, String streamName) {...} // Create an empty stream with multiple partitions that a Samza job can produce to public static <T> CollectionStream<T> empty(String systemName, String streamName, Integer partitionCount) {...} // Create a stream of messages from input list with single partition public static <T> CollectionStream<T> of(String systemName, String streamName, Iterable<T> collection){...} // Create a stream of messages from input list with multiple partition, key of partitions map is partitionId public static <T> CollectionStream<T> of(String systemName, String streamName, Map<Integer,? extends Iterable<T>> partitions){...} // Associate this CollectionStream with a CollectionStreamSystem public CollectionStream<T> from(CollectionStreamSystem system) {...} } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * CollectionStreamSystem provides utilities to create and initialize an in memory input stream. */ public class CollectionStreamSystem { // Create a CollectionStreamSystem public static CollectionStreamSystem create(String name) {...} // Create an In memory stream and initialize it with messages from partitions map public <T> CollectionStreamSystem addInput(String streamName, Map<Integer, Iterable<T>> partitions) {...} // Create an empty in memory stream with as many partitions as partitionCount public <T> CollectionStreamSystem addOutput(String streamName, Map<Integer, Integer partitionCount and initialize an in memory input stream. */ public class CollectionStreamSystem { // Create a CollectionStreamSystem public static CollectionStreamSystem create(String name) {...} } |
EventStream:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * EventStream provides utilities to build an in memory input stream of events. It helps mimic run time environment of your job, * advancing time for windowing functions */ public class EventStream<T> { public static abstract class Builder<T> { public abstract Builder addElement(); public abstract Builder addException(); public abstract Builder advanceTimeTo(long time); public abstract EventStream<T> build(); } public static <T> Builder<T> builder() {...} } public class EventStreamSystem { public static EventStreamSystem create(String name) {...} } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with * minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10 */ // Create a StreamTask MyStreamTestTask myTask = new StreamTask() { @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test","output"), obj*10)); } }; CollectionStreamSystem system = CollectionStreamSystem.create("test"); CollectionStream<Integer> input = CollectionStream .of("test", "input", {1,2,3,4}) .from(system); CollectionStream output = CollectionStream .empty("test", "output") .from(system); TestRunner .of(MyStreamTestTask.class) .addInputStream(input) .addOutputStream(output) .run(); // Assertions on the outputs Assert.assertThat(StreamUtilsTestRunner.getStreamStateconsumeStream(output), IsIterableContainingInOrder.contains({10,20,30,40}))); |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application in the async mode */ // public class MyAsyncStreamTask implements AsyncStreamTask, InitableTask, ClosableTask { private Client client; private WebTarget target; @Override public void init(Config config, TaskContext taskContext) throws Exception { // Your initialization of web client code goes here client = ClientBuilder.newClient(); target = client.target("http://example.com/resource/").path("hello"); } @Override public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, final TaskCallback callback) { target.request().async().get(new InvocationCallback<Response>() { @Override public void completed(Response response) { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10)); callback.complete(); } @Override public void failed(Throwable throwable) { System.out.println("Invocation failed."); callback.failure(throwable); } }); } @Override public void close() throws Exception { client.close(); } } CollectionStreamSystem system = CollectionStreamSystem.create("test"); CollectionStream<Integer> input = CollectionStream .of("test", "input", {1,2,3,4}) .from(system); CollectionStream output = CollectionStream .empty("test", "output") .from(system); TestRunner .of(MyAsyncStreamTask.class) .addInputStream(input) .addOutputStream(output) .run(); // Assertions on the outputs StreamAssert.that(output).containsInAnyOrder({10,20,30,40}); |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a High level application */ public class MyStreamApplication implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream<Integer> pageViews = graph.getInputStream(“test-samza.page-views”); pageViews.map(s -> "processed " + s) .sendTo(graph.getOutputStream(“test-samza.output”)); } } CollectionStreamSystem system = CollectionStreamSystem.create("test"); CollectionStream<Integer> input = CollectionStream .of("test", "input", {1,2,3,4}) .from(system); CollectionStream output = CollectionStream .empty("test", "output") .from(system); // Initialize and run the test framework TestRunner .of(new MyStreamApplication()); .addInputStream(input) .addOutputStream(output) .addOverrideConfig("job.default.system", "test") .run(); // Assertions on the outputs StreamAssert.that(output).contains({"processed 1", "processed 2", "processed 4"}); |
...