...
Test framework will support stateful and stateless testing, satateful testing can be either either done using RocksDB or maintaining state in memory
Future Changes with Stream Descriptors:
...
Code Block |
---|
language | java |
---|
theme | Eclipse |
---|
title | TestApplication |
---|
|
/**
* TestApplication provides static factory for quick setup of Samza environment for testing High Level Api, users can configure input
* streams and then apply various operators on streams and run the application
*/
public class TestApplication<T> {
// Static factory to config & create runner for High level api
public static TestApplication create(HashMap<String, String> config, Mode mode) {...}
// Configure state for application
public TestApplication addState(Serde<?> serde) {...}
// Configure any kind of input stream for samza system, and get the handle of message stream to apply operators on
public <T> MessageStream<T> getInputStream(EventStream<T> stream) {...}
public <T> MessageStream<T> getInputStream(CollectionStream<T> stream) {...}
public <T> MessageStream<T> getInputStream(FileStream<T> stream) {...}
// Run the app
public void run() { };
} |
...
Code Block |
---|
language | java |
---|
theme | Eclipse |
---|
title | CollectionStream |
---|
|
/**
* CollectionStream provides utilities to build an in memory input stream of collections(list, map). It also supports initialization of
* multiple partitions for an input stream
*/
public class CollectionStream<T> {
// Create an empty stream that a Samza task can produce to
public static <T> CollectionStream<T> empty(String steamId) {...}
// Create a stream of messages from input list with single partition
public static <T> CollectionStream<T> of(String steamId, List<T> collection){...}
// Create a stream of messages from input list with multiple partition
public static <T> CollectionStream<T> ofPartitions(String steamId, List<List<T>> collection){...}
public static <K,V> CollectionStream<T> ofPartitions(String steamId, List<Map<K, V>>> collection){...}
// Create a stream of messages from input list with multiple partition
public static <K, V> CollectionStream<KV<K, V>> of(String steamId, Map<K, V> elems) {...}
}
|
...
Code Block |
---|
language | java |
---|
theme | Eclipse |
---|
title | Simple StreamTask Test |
---|
|
/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/
// Create a StreamTask
StreamTask myTask = new StreamTask() {
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
}
};
// Initialize and run the test framework
TestTask
.create(myTask, config, mode.SINGLE_CONTAINER)
.setJobContainerThreadPoolSize(4)
.addInputStream(CollectionStream.of("test-samza.input", {1,2,3,4}))
.addOutputStream(CollectionStream.empty("test-samza.output"))
.run();
// Assertions on the outputs
StreamAssert.that("test-samza.output").containsInAnyOrdercontains({10,30,20,40}); |
Code Block |
---|
language | java |
---|
theme | Eclipse |
---|
title | Simple AsyncStreamTask Test |
---|
|
/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/
//
public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
private Client client;
private WebTarget target;
@Override
public void init(Config config, TaskContext taskContext) throws Exception {
// Your initialization of web client code goes here
client = ClientBuilder.newClient();
target = client.target("http://example.com/resource/").path("hello");
}
@Override
public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
TaskCoordinator coordinator, final TaskCallback callback) {
target.request().async().get(new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
callback.complete();
}
@Override
public void failed(Throwable throwable) {
System.out.println("Invocation failed.");
callback.failure(throwable);
}
});
}
@Override
public void close() throws Exception {
client.close();
}
}
// Initialize and run the test framework
TestTask
.create(new AsyncRestTask(), config, Mode.SINGLE_CONTAINER)
.setTaskCallBackTimeoutMS(200)
.setTaskMaxConcurrency(4)
.addInputStream(CollectionStream.of("test-samza.Input", {1,2,3,4}))
.addOutputStream(CollectionStream.empty("test-samza.Output"))
.run();
// Assertions on the outputs
StreamAssert.that("test-samza.Output").containsInAnyOrder({10,20,30,40});
|
Code Block |
---|
language | java |
---|
theme | Eclipse |
---|
title | Simple High Level Api Test |
---|
|
/**
* Simple Test case using a collection as an input for a High level application
*/
// Initialize and run the test framework
TestApplication
.create(config, mode.SINGLE_CONTAINER);
.getInputStream(CollectionStream.of("test-samza.input", {1,2,3}))
.map(s -> "processed " + s)
.run();
// Assertions on the outputs
StreamAssert.that("test-samza.input").containsInAnyOrdercontains(Arrays.asList("processed 1", "processed 2", "processed 4")); |
...