...
System depends on In Memory system to spin up in-memory system consumers and producers for in memory data streams
Single Container mode runs on SingleContainerGrouperFactory, that means these test are supposed to be ran on a single container environment
Multi Container system leverages Samza as a library (Samza standalone) using Zookeeper as coordination service for setting up test
In order to provide a testing api with least config management, basic configs are set up for users, but flexibility to add/override some custom config is also provided
Testing is always supposed to be done using bounded streams using EndOfStreamMessage
System should directly consume from File stream or Local Kafka stream without any additional configs, File Stream implementation is out of scope for this SEP, although this SEP suggests what the design for file stream should look like
Bounded message streams from Kafka (either using Latch or EndOfStreamMessage) is beyond the scope of this SEP
For Stateful testing system may use RocksDb as it ships with Samza or maintain the state in memory
Design
We propose a three step process for configuration of integration test for your samza job. Three facets are data injection, transformation and validation.
...
- Task Assert: assert utilities on low level api
- Stream Assert: assert utilities on low level api
Data Types & Partitions:
StreamAssert | TaskAssert | StateAssert |
---|---|---|
contains | contains | contains |
containsInAnyOrder | containsInAnyOrder | satisfies |
inWindow | isOnTimePane | |
inFinalPane | satisfiers | |
isOnTimePane | ||
satifies |
Data Types & Partitions:
The framework will provide complete flexibility for The framework will provide complete flexibility for usage of primitive and derived data types. Serdes are required for local kafka stream and file stream but in memory streams dont require any Serde configuration. Test framework will provide api's for initialization of input streams (both single and multi-partition), and also data validation on single partition and multi-partition of the bounded streams
...
Traditionally we ask users to set up config for any samza job, for test purposes we set up basic config boiler plate for users and provide them a flexible option to still add any custom config (rarely needed), api exposes functions to configure single container or multi container mode (using Zookeeper). It also provides functions to configure concurrency semantics for their Samza job.
Stateful & Stateless Testing:
Test framework will support stateful and stateless testing, satateful testing can be either
Future Changes with Stream Descriptors:
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestTask provides static factory for quick setup of Samza environment for testing low level api, users can configure input streams * they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask and AsyncStreamTask */ public class TestTask { private TestTask(String systemName, StreamTask task, HashMap<String, String> config) {...} private TestTask(String systemName, AsyncStreamTask task, HashMap<String, String> config) {...} TestTask { // Static Factory to config & create runner for low level api public static TestTask create(String systemName, StreamTask task, HashMap<String, String> config) {...} public static TestTask create(String systemName, AsyncStreamTask task, HashMap<String, String> config) {...} // Multithreading configs for users public TestTask setTaskMaxConcurrency(Integer value) {...} public TestTask setTaskCallBackTimeoutMS(Integer value) {...} public TestTask setTaskMaxConcurrency(Integer value) {...} // Configure an input stream for samza system, that task can consume from public TestTask addInputStream(CollectionStream stream) {...} // Configures an output stream for samza system, that task can producer to public TestTask addOutputStream(CollectionStream stream) {...} // Run the app public void run() {...} } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* CollectionStream provides utilities to build an in memory input stream of collections(list, map). It also supports initialization of
* multiple partitions for an input stream
*/
public class CollectionStream<T> {
private CollectionStream(String systemStream, List<T> collection, Integer partitionCount) {...}
private CollectionStream(String systemStream) {...}
// Create an empty stream that a Samza task can produce to
public static <T> CollectionStream<T> empty(String systemStream) {...}
// Create a stream of messages from input list with single partition
public static <T> CollectionStream<T> of(String systemStream, List<T> collection){...}
// Create a stream of messages from input list with multiple partition
public static <T> CollectionStream<T> ofPartitions(String systemStream, List<List<T>> collection){...}
// Create a stream of messages from input list with multiple partition
public static <K, V> CollectionStream<KV<K, V>> of(String systemStream, Map<K, V> elems) {...}
}
|
...