THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestTask provides static factory for quick setup of Samza environment for testing low level api, users can configure input streams * they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask and AsyncStreamTask */ public class TestTask { // Static Factory to config & create runner for low level api // Mode is either SingleContainer or MultiContainer public static TestTask create(StreamTask task, HashMap<StringMap<String, String> config, Mode mode) {...} public static TestTask create(AsyncStreamTask task, HashMap<StringMap<String, String> config, Mode mode) {...} // Multithreading configs for users public TestTask setTaskMaxConcurrency(Integer value) {...} public TestTask setTaskCallBackTimeoutMS(Integer value) {...} public TestTask setTaskMaxConcurrency(Integer value) {...} // Configure an input stream for samza system, that task can consume from public TestTask addInputStream(CollectionStream stream) {...} // Configures an output stream for samza system, that task can producer to public TestTask addOutputStream(CollectionStream stream) {...} // Run the app public void run() {...} } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestApplication provides static factory for quick setup of Samza environment for testing High Level Api, users can configure input * streams and then apply various operators on streams and run the application */ public class TestApplication<T> { private TestApplication( HashMap<String, String> configs, Mode mode) // Static factory to config & create runner for High level api public static TestApplication create(String systemName, HashMap<String, String> config, Mode mode) {...} // Configure any kind of input stream for samza system, and get the handle of message stream to apply operators on public <T> MessageStream<T> getInputStream(EventStream<T> stream) {...} public <T> MessageStream<T> getInputStream(CollectionStream<T> stream) {...} public <T> MessageStream<T> getInputStream(FileStream<T> stream) {...} // Run the app public void run() { }; } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * CollectionStream provides utilities to build an in memory input stream of collections(list, map). It also supports initialization of * multiple partitions for an input stream */ public class CollectionStream<T> { // Create an empty stream that a Samza task can produce to public static <T> CollectionStream<T> empty(String systemStreamsteamId) {...} // Create a stream of messages from input list with single partition public static <T> CollectionStream<T> of(String systemStreamsteamId, List<T> collection){...} // Create a stream of messages from input list with multiple partition public static <T> CollectionStream<T> ofPartitions(String systemStreamsteamId, List<List<T>> collection){...} // Create a stream of messages from input list with multiple partition public static <K, V> CollectionStream<KV<K, V>> of(String systemStreamsteamId, Map<K, V> elems) {...} } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with * minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10 */ // Create a StreamTask StreamTask myTask = new StreamTask() { @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10)); } }; // Initialize and run the test framework TestTask .create(systemName: "test-samza", .create(myTask, config, mode.SINGLE_CONTAINER) .setJobContainerThreadPoolSize(4) .addInputStream(CollectionStream.of(streamName: ""test-samza.input", {1,2,3,4})) .addOutputStream(CollectionStream.empty(streamName: ""test-samza.output")) .run(); // Assertions on the outputs TaskAssert.that("test-samza", "Output.output").containsInAnyOrder({10,30,20,40}); |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application in the async mode */ // public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask { private Client client; private WebTarget target; @Override public void init(Config config, TaskContext taskContext) throws Exception { // Your initialization of web client code goes here client = ClientBuilder.newClient(); target = client.target("http://example.com/resource/").path("hello"); } @Override public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, final TaskCallback callback) { target.request().async().get(new InvocationCallback<Response>() { @Override public void completed(Response response) { // process your message callback.complete(); } @Override public void failed(Throwable throwable) { System.out.println("Invocation failed."); callback.failure(throwable); } }); } @Override public void close() throws Exception { client.close(); } } // Initialize and run the test framework TestTask .create("test-samza", new AsyncRestTask(), config, Mode.SINGLE_CONTAINER) .setTaskCallBackTimeoutMS(200) .setTaskMaxConcurrency(4) .addInputStream(CollectionStream.of("test-samza.PageView", pageviews)) .addOutputStream(CollectionStream.empty("test-samza.Output")) .run(); // Assertions on the outputs TaskAssert.that("test-samza", ".Output").containsInAnyOrder(pageviews); |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a High level application */ // Initialize and run the test framework TestApplication .create(config, mode.SINGLE_CONTAINER); .getInputStream(CollectionStream.of(streamName: ""test-samza.input", {1,2,3})) .map(s -> "processed " + s) .run(); // Assertions on the outputs StreamAssert.that(stream"test-samza.input").containsInAnyOrder(Arrays.asList("processed 1", "processed 2", "processed 4")); |
...