...
This is the Samza job you write, this can be either done using Low Level Api or the fluent High Level Api. Test frameworks provides api to set up test for both the cases. Test framework supports both the apis with Single container and Multi-container mode. Users implement StreamTask and /Async Stream task in /Stream Application in the same way, as they do for their Samza job . For high level api users don't need a class implementing StreamApplication, they just configure message streams of any type and apply operators on it directly (see the sample example below(or pass an instance of Samza job).
Data Validation:
...
The framework will provide complete flexibility for usage of primitive and derived data types. Serdes are required for local kafka stream and file stream but in memory streams dont require any Serde configuration. Serdes will be also required if users want to maintain State. Test framework will provide api's for initialization of input streams (both single and multi-partition), and also data validation on single partition and multi-partition of the bounded streams
...
The test framework is designed in a way which asks users to do none or minimal Samza configs, in future we intend to use StreamDescriptors in the test framework to do Samza configs. This change would cause a small change in the way user passes their custom configs (if any) to the test framework With StreamDescriptor and High Level api, another wrapper can be provided over the job to run the configuration in a test environment
Test Matrix (Plan)
Samza API | Concurrency | Partitions | Container | Expected Result | |
---|---|---|---|---|---|
StreamTask | task.max.concurrency = 1 job.container.thread.pool.size = n | 1 <= p <= n | 1 / n | in order processing | |
task.max.concurrency > 1 job.container.thread.pool.size = n | 1 <= p <= n | 1 / n | out of order processing | ||
AsyncStreamTask | task.max.concurrency = 1 | 1 <= p <= n | 1 / n | in order processing | |
task.max.concurrency = n | 1 <= p <= n | 1 / n | out of order processing | ||
Windowable Task | N/A | 1 <= p <= n | 1 / n | expecting processing n messages for messages window of time t | |
Initiable Task | N/A | 1 <= p <= n | 1 / n | stateful testing (assertions on kv store) | |
Closable Task | N/A | 1 <= p <= n | 1 / n | ? | |
Map / Flatmap / Filter / Partition By / Merge / SendTo / Sink / Join / Window
| task.max.concurrency = 1 job.container.thread.pool.size = n | 1 <= p < n | 1 / n | in order processing | |
task.max.concurrency > 1 job.container.thread.pool.size = n | out of order processing |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestRunner provides static factory for quick setup of Samza environment for testing Low level api and High level api , users can configure input streams * they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask and AsyncStreamTask */ public class TestRunner { // Static Factory to config & create runner for low level api public static TestRunner of(StreamTask task) {...} public static TestRunner of(AsyncStreamTask task) {...} public static TestRunner of(StreamApplication app) {...} // Add/Ovveride any custom configs public TestRunner addOverrideConfigs(Map<String,String> configs) {...} public TestRunner addOverrideConfigs(String configUri) {...} public TestRunner addConfigaddOverrideConfig(String key, String val){...} // Set container mode either single container or multi container public TestRunner setContainerMode(Mode mode) {...} // Multithreading configs for users public TestRunner setTaskMaxConcurrency(Integer value) {...} public TestRunner setTaskCallBackTimeoutMS(Integer value) {...} public TestRunner setJobContainerThreadPoolSize(Integer value) {...} // Configure state for application public TestRunner addState(String storeName) {...} // Configure an input stream for samza system, that task can consume from public TestRunner addInputStream(CollectionStream stream) {...} // Configures an output stream for samza system, that task can producer to public TestRunner addOutputStream(CollectionStream stream) {...} // Run the app public void run() {...} } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a High level application */ public class MyStreamApplication implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream<Integer> pageViews = graph.getInputStream(“test-samza.page-views”); pageViews.map(s -> "processed " + s) .sendTo(graph.getOutputStream(“test-samza.Output”)); } } // Initialize and run the test framework TestRunner .of(new MyStreamApplication()); .addInputStream(CollectionStream.of("test-samza.input", {1,2,3})) .addOutputStream(CollectionStream.empty("test-samza.Outputoutput")) .run(); // Assertions on the outputs StreamAssert.that("test-samza.input").contains(Arrays.asList({"processed 1", "processed 2", "processed 4")}); |
Implementation and Test Plan
...