...
Samza provides complete flexibility in usage of different data types for input steams, this framework will also provide complete flexibility for usage of primitive and derived data types.
Public Interfaces
Code Block |
---|
language | java |
---|
theme | Eclipse |
---|
title | TestTask |
---|
|
public class TestTask {
private TestTask(String systemName, StreamTask task, HashMap<String, String> config) {...}
private TestTask(String systemName, AsyncStreamTask task, HashMap<String, String> config) {...}
// Static Factory to config & create runner for low level api
public static TestTask create(String systemName, StreamTask task, HashMap<String, String> config) {...}
public static TestTask create(String systemName, AsyncStreamTask task, HashMap<String, String> config) {...}
// Multithreading configs for users
public TestTask setTaskMaxConcurrency(Integer value) {...}
public TestTask setTaskCallBackTimeoutMS(Integer value) {...}
public TestTask setTaskMaxConcurrency(Integer value) {...}
// Configure an input stream for samza system, that task can consume from
public TestTask addInputStream(CollectionStream stream) {...}
// Configures an output stream for samza system, that task can producer to
public TestTask addOutputStream(CollectionStream stream) {...}
public void run() {...}
}
|
Code Block |
---|
language | java |
---|
theme | Eclipse |
---|
title | FileStream |
---|
|
public class FileStream<T> {
public static <T> FileStream<T> of(String fileUri) {...}
} |
Code Block |
---|
language | java |
---|
theme | Eclipse |
---|
title | CollectionStream |
---|
|
/**
* CollectionStream provides utilities to build a in memory input stream of collections(list, map). It also supports initialization of
* partitions
*/
public class CollectionStream<T> {
private CollectionStream(String systemStream, List<T> collection, Integer partitionCount) {...}
private CollectionStream(String systemStream) {...}
public static <T> CollectionStream<T> empty(String systemStream) {
return new CollectionStream<>(systemStream);
}
public static <T> CollectionStream<T> of(String systemStream, List<T> collection){
return new CollectionStream<>(systemStream, collection, 1);
}
public static <T> CollectionStream<T> ofPartitions(String systemStream, List<List<T>> collection){
return new CollectionStream<>(systemStream, collection, collection.size());
}
public static <K, V> CollectionStream<KV<K, V>> of(String systemStream, Map<K, V> elems) {...}
}
|
Code Block |
---|
language | java |
---|
theme | Eclipse |
---|
title | Simple StreamTask Test |
---|
linenumbers | true |
---|
|
/**
* Sample Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies
* each integer with 10
*/
// Create a StreamTask
StreamTask myTask = new StreamTask() {
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
}
};
// Initialize and run the test framework
TestTask
.create(systemName: "test-samza", myTask, config)
.setJobContainerThreadPoolSize(4)
.addInputStream(CollectionStream.of(streamName: "input", {1,2,3,4}))
.addOutputStream(CollectionStream.empty(streamName: "output"))
.run();
// Assertions on the outputs
TaskAssert.that("test-samza", "Output").containsInAnyOrder({10,30,20,40}); |
...