...
Traditionally we ask users to set up config for any samza job, for test purposes we set up basic config boiler plate for users and provide them a flexible option to still add any custom config (rarely needed), api exposes functions to configure single container or multi container mode. It also provides apis to configure concurrency semantics for the job.
Public Interfaces
Two Apis for writing tests are: Low Level Test Api (TestTask) & High Level Api (TestApplication)
Code Block |
---|
language | java |
---|
theme | Eclipse |
---|
title | TestTask |
---|
|
public class TestTask {
private TestTask(String systemName, StreamTask task, HashMap<String, String> config) {...}
private TestTask(String systemName, AsyncStreamTask task, HashMap<String, String> config) {...}
// Static Factory to config & create runner for low level api
public static TestTask create(String systemName, StreamTask task, HashMap<String, String> config) {...}
public static TestTask create(String systemName, AsyncStreamTask task, HashMap<String, String> config) {...}
// Multithreading configs for users
public TestTask setTaskMaxConcurrency(Integer value) {...}
public TestTask setTaskCallBackTimeoutMS(Integer value) {...}
public TestTask setTaskMaxConcurrency(Integer value) {...}
// Configure an input stream for samza system, that task can consume from
public TestTask addInputStream(CollectionStream stream) {...}
// Configures an output stream for samza system, that task can producer to
public TestTask addOutputStream(CollectionStream stream) {...}
// Run the app
public void run() {...}
}
|
Code Block |
---|
language | java |
---|
theme | Eclipse |
---|
title | FileStream |
---|
|
public class TestApplication<T> {
// Configure an input stream for samza system, that app can use high order functions on
public <T> MessageStream<T> getInputStream(EventStream<T> stream) {return null;}
public <T> MessageStream<T> getInputStream(CollectionStream<T> stream) {return null;}
public <T> MessageStream<T> getInputStream(FileStream<T> stream) {return null;}
// Run the app
public void run() { };
} |
Types of Input Streams
Code Block |
---|
language | java |
---|
theme | Eclipse |
---|
title | CollectionStream |
---|
|
/**
* CollectionStream provides utilities to build a in memory input stream of collections(list, map). It also supports initialization of
* partitions
*/
public class CollectionStream<T> {
private CollectionStream(String systemStream, List<T> collection, Integer partitionCount) {...}
private CollectionStream(String systemStream) {...}
public static <T> CollectionStream<T> empty(String systemStream) {
return new CollectionStream<>(systemStream);
}
public static <T> CollectionStream<T> of(String systemStream, List<T> collection){
return new CollectionStream<>(systemStream, collection, 1);
}
public static <T> CollectionStream<T> ofPartitions(String systemStream, List<List<T>> collection){
return new CollectionStream<>(systemStream, collection, collection.size());
}
public static <K, V> CollectionStream<KV<K, V>> of(String systemStream, Map<K, V> elems) {...}
}
|
...
Code Block |
---|
language | java |
---|
theme | Eclipse |
---|
title | FileStream |
---|
|
public class FileStream<T> {
public static <T> FileStream<T> of(String fileUri) {...}
} |
...
A Sample Test:
Code Block |
---|
language | java |
---|
theme | Eclipse |
---|
title | Simple StreamTask Test |
---|
linenumbers | true |
---|
|
/**
* Sample Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies
* each integer with 10
*/
// Create a StreamTask
StreamTask myTask = new StreamTask() {
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
}
};
// Initialize and run the test framework
TestTask
.create(systemName: "test-samza", myTask, config)
.setJobContainerThreadPoolSize(4)
.addInputStream(CollectionStream.of(streamName: "input", {1,2,3,4}))
.addOutputStream(CollectionStream.empty(streamName: "output"))
.run();
// Assertions on the outputs
TaskAssert.that("test-samza", "Output").containsInAnyOrder({10,30,20,40}); |
...