THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestTask provides static factory for quick setup of Samza environment for testing low level api, users can configure input streams * they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask and AsyncStreamTask */ public class TestTask { // Static Factory to config & create runner for low level api public static TestTask create(StreamTask task) {...} public static TestTask create(AsyncStreamTask task) {...} // Add/Ovveride any custom configs public TestTask addOverrideConfigs(Map<String,String> config) {...} // Set container mode either single container or multi container public TestTask setContainerMode(Mode mode) {...} // Multithreading configs for users public TestTask setTaskMaxConcurrency(Integer value) {...} public TestTask setTaskCallBackTimeoutMS(Integer value) {...} public TestTask setTaskMaxConcurrency(Integer value) {...} // Configure state for application public TestTask addState(String storeName) {...} // Configure an input stream for samza system, that task can consume from public TestTask addInputStream(CollectionStream stream) {...} // Configures an output stream for samza system, that task can producer to public TestTask addOutputStream(CollectionStream stream) {...} // Run the app public void run() {...} } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestApplication provides static factory for quick setup of Samza environment for testing High Level Api, users can configure input * streams and then apply various operators on streams and run the application */ public class TestApplication<T> { // Static factory to config & create runner for High level api public static TestApplication create(Map<String, String> config) {...} public static TestApplication create(StreamApplication app) {...} // Add/Ovveride any custom configs public TestApplication addOverrideConfigs(Map<String,String> config) {...} // Set container mode either single container or multi container public TestApplication setContainerMode(Mode mode) {...} // Multithreading configs for users public TestApplication setTaskMaxConcurrency(Integer value) {...} public TestApplication setTaskCallBackTimeoutMS(Integer value) {...} public TestApplication setTaskMaxConcurrency(Integer value) {...} // Configure any kind of input stream for samza system, and get the handle of message stream to apply operators on public <T> MessageStream<T> getInputStream(EventStream<T> stream) {...} public <T> MessageStream<T> getInputStream(CollectionStream<T> stream) {...} public <T> MessageStream<T> getInputStream(FileStream<T> stream) {...} // Run the app public void run() {...}; } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Simple Test case using a collection as an input for a High level application
*/
// Initialize and run the test framework
TestApplication
.create(config);
.getInputStream(CollectionStream.of("test-samza.input", {1,2,3}))
.map(s -> "processed " + s)
.run();
// Assertions on the outputs
StreamAssert.that("test-samza.input").contains(Arrays.asList("processed 1", "processed 2", "processed 4")); |
...