THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Metadata Streams | Tests | |
---|---|---|
Changelog Stream | Assertions on Changelog stream in Stateful tests | |
Checkpoint Stream | Asserts on Checkpoint Stream (Kafka or InMemory) | |
Coordinator Stream | Asserts on Coordinator stream (Kafka or InMemory) |
Public Interfaces
Two Apis for writing tests are: Low Level Test Api (TestTask) & High Level Api (TestApplication)
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestTaskTestRunner provides static factory for quick setup of Samza environment for testing low Low level api and High level api , users can configure input streams * they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask and AsyncStreamTask */ public class TestTaskTestRunner { // Static Factory to config & create runner for low level api public static TestTaskTestRunner createof(StreamTask task) {...} public static TestTaskTestRunner createof(AsyncStreamTask task) {...} public static TestRunner of(StreamApplication app) {...} // Add/Ovveride any custom configs public TestTaskTestRunner addOverrideConfigs(Map<String,String> configs) {...} public TestTaskTestRunner addOverrideConfigs(String configUri) {...} public TestRunner addConfig(String key, String val){...} // Set container mode either single container or multi container public TestTaskTestRunner setContainerMode(Mode mode) {...} // Multithreading configs for users public TestTaskTestRunner setTaskMaxConcurrency(Integer value) {...} public TestTaskTestRunner setTaskCallBackTimeoutMS(Integer value) {...} public TestTaskTestRunner setJobContainerThreadPoolSize(Integer value) {...} // Configure state for application public TestTaskTestRunner addState(String storeName) {...} // Configure an input stream for samza system, that task can consume from public TestTaskTestRunner addInputStream(CollectionStream stream) {...} // Configures an output stream for samza system, that task can producer to public TestTaskTestRunner addOutputStream(CollectionStream stream) {...} // Run the app public void run() {...} } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with * minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10 */ // Create a StreamTask StreamTask myTask = new StreamTask() { @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10)); } }; // Initialize and run the test framework TestTaskTestRunner .create(myTask) .setJobContainerThreadPoolSize(4) .addInputStream(CollectionStream.of("test-samza.input", {1,2,3,4})) .addOutputStream(CollectionStream.empty("test-samza.output")) .run(); // Assertions on the outputs StreamAssert.that("test-samza.output").contains({10,30,20,40}); |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application in the async mode */ // public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask { private Client client; private WebTarget target; @Override public void init(Config config, TaskContext taskContext) throws Exception { // Your initialization of web client code goes here client = ClientBuilder.newClient(); target = client.target("http://example.com/resource/").path("hello"); } @Override public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, final TaskCallback callback) { target.request().async().get(new InvocationCallback<Response>() { @Override public void completed(Response response) { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10)); callback.complete(); } @Override public void failed(Throwable throwable) { System.out.println("Invocation failed."); callback.failure(throwable); } }); } @Override public void close() throws Exception { client.close(); } } // Initialize and run the test framework TestTaskTestRunner .create(new AsyncRestTask()) .setTaskCallBackTimeoutMS(200) .setTaskMaxConcurrency(4) .addInputStream(CollectionStream.of("test-samza.Input", {1,2,3,4})) .addOutputStream(CollectionStream.empty("test-samza.Output")) .run(); // Assertions on the outputs StreamAssert.that("test-samza.Output").containsInAnyOrder({10,20,30,40}); |
...