THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestRunner provides static factory for quick setup of Samza environment for testing Low level api and High level api , users can configure input streams * they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask and AsyncStreamTask */ public class TestRunner { // Static Factory to config & create runner for low level api public static TestRunner of(StreamTask task) {...} public static TestRunner of(AsyncStreamTask task) {...} public static TestRunner of(StreamApplication app) {...} // Add/Ovveride any custom configs public TestRunner addOverrideConfigs(Map<String,String> configs) {...} public TestRunner addOverrideConfigs(String configUri) {...} public TestRunner addConfig(String key, String val){...} // Set container mode either single container or multi container public TestRunner setContainerMode(Mode mode) {...} // Multithreading configs for users public TestRunner setTaskMaxConcurrency(Integer value) {...} public TestRunner setTaskCallBackTimeoutMS(Integer value) {...} public TestRunner setJobContainerThreadPoolSize(Integer value) {...} // Configure state for application public TestRunner addState(String storeName) {...} // Configure an input stream for samza system, that task can consume from public TestRunner addInputStream(CollectionStream stream) {...} // Configures an output stream for samza system, that task can producer to public TestRunner addOutputStream(CollectionStream stream) {...} // Run the app public void run() {...} } |
Types of Input Streams
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestApplicationCollectionStream provides staticutilities factoryto forbuild quickan setupin ofmemory Samzainput environmentstream for testing High Level Api. Users can either passof collections(list, map). It also supports initialization of * instancemultiple ofpartitions StreamApplciationfor oran letinput framework do the initization for them and then configure input * streams and then apply various operators on streams and run the application */ public class TestApplication<T> {stream */ public class CollectionStream<T> { // Create an empty stream that a Samza task can produce to public static <T> CollectionStream<T> empty(String steamId) {...} // StaticCreate factorya tostream configof &messages createfrom runnerinput forlist Highwith levelsingle api partition public static <T> TestApplicationCollectionStream<T> createof() {...} public static TestApplication create(StreamApplication app) String steamId, Iterable<T> collection){...} // Add/OvverideCreate anya customstream configs of messages public TestApplication addOverrideConfigs(Map<String,String> config) {...}from input list with multiple partition, key of partitions map is partitionId public static <T> TestApplicationCollectionStream<T> addOverrideConfigsof(String configUri) steamId, Map<Integer,Iterable<T>> partitions){...} // Set container mode either single container or multi container public TestApplication setContainerMode(Mode mode) {...} // Multithreading configs for users public TestApplication setTaskMaxConcurrency(Integer value) {...} public TestApplication setTaskCallBackTimeoutMS(Integer value) {...} public TestApplication setJobContainerThreadPoolSize(Integer value) {...} // Configure any kind of input stream for samza system, and get the handle of message stream to apply operators on public <T> MessageStream<T> getInputStream(EventStream<T> stream) {...} public <T> MessageStream<T> getInputStream(CollectionStream<T> stream) {...} public <T> MessageStream<T> getInputStream(FileStream<T> stream) {...} // Run the app public void run() {...}; } |
Types of Input Streams
}
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* EventStream provides utilities to build an in memory input stream of events. It helps mimic run time environment of your job,
* advancing time for windowing functions
*/
public class EventStream<T> {
public static abstract class Builder<T> {
public abstract Builder addElement();
public abstract Builder addException();
public abstract Builder advanceTimeTo(long time);
public abstract EventStream<T> build();
}
public static <T> Builder<T> builder() {...}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
Code Block | ||||||
| ||||||
/** * CollectionStreamFileStream provides utilities to build ana instream memoryof inputmessages streamfrom of collections(list, map). It also supports initialization of * multiple partitions for an input stream a file on disk */ public class CollectionStream<T>FileStream<T> { //public Createstatic an<T> emptyFileStream<T> stream that a Samza task can produce to public static <T> CollectionStream<T> empty(String steamId) {...} // Create a stream of messages from input list with single partition public static <T> CollectionStream<T> of(String steamId, Iterable<T> collection){...} // Create a stream of messages from input list with multiple partition, key of partitions map is partitionId public static <T> CollectionStream<T> of(String steamId, Map<Integer,Iterable<T>> partitions){...} } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* EventStream provides utilities to build an in memory input stream of events. It helps mimic run time environment of your job,
* advancing time for windowing functions
*/
public class EventStream<T> {
public static abstract class Builder<T> {
public abstract Builder addElement();
public abstract Builder addException();
public abstract Builder advanceTimeTo(long time);
public abstract EventStream<T> build();
}
public static <T> Builder<T> builder() {...}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* FileStream provides utilities to build a stream of messages from a file on disk
*/
public class FileStream<T> {
public static <T> FileStream<T> of(String fileUri) {...}
} |
...
of(String fileUri) {...}
} |
Examples Usages of Test Api:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/
// Create a StreamTask
StreamTask myTask = new StreamTask() {
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
}
};
// Initialize and run the test framework
TestRunner
.of(myTask)
.setJobContainerThreadPoolSize(4)
.addInputStream(CollectionStream.of("test-samza.input", {1,2,3,4}))
.addOutputStream(CollectionStream.empty("test-samza.output"))
.run();
// Assertions on the outputs
StreamAssert.that("test-samza.output").contains({10,30,20,40}); |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/
//
public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
private Client client;
private WebTarget target;
@Override
public void init(Config config, TaskContext taskContext) throws Exception {
// Your initialization of web client code goes here
client = ClientBuilder.newClient();
target = client.target("http://example.com/resource/").path("hello");
}
@Override
public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
TaskCoordinator coordinator, final TaskCallback callback) {
target.request().async().get(new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
|
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with * minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10 */ // Create a StreamTask StreamTask myTask = new StreamTask() { @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10)); } }; // Initialize and run the test framework TestRunner .create(myTask) callback.complete(); } @Override public void failed(Throwable throwable) { .setJobContainerThreadPoolSize(4) System.addInputStream(CollectionStreamout.ofprintln("test-samza.input", {1,2,3,4}))Invocation failed."); .addOutputStream(CollectionStream.empty("test-samza.output")) callback.runfailure(throwable); // Assertions on the outputs StreamAssert.that("test-samza.output").contains({10,30,20,40}); |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/
//
public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
private Client client;
private WebTarget target;
@Override
public void init(Config config, TaskContext taskContext) throws Exception {
// Your initialization of web client code goes here
client = ClientBuilder.newClient();
target = client.target("http://example.com/resource/").path("hello");
}
@Override
public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
TaskCoordinator coordinator, final TaskCallback callback) {
target.request().async().get(new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
callback.complete();
}
@Override
public void failed(Throwable throwable) {
System.out.println("Invocation failed.");
callback.failure(throwable);
}
});
}
@Override
public void close() throws Exception {
client.close();
}
}
// Initialize and run the test framework
TestRunner
.create(new AsyncRestTask())
.setTaskCallBackTimeoutMS(200)
.setTaskMaxConcurrency(4)
.addInputStream(CollectionStream.of("test-samza.Input", {1,2,3,4}))
.addOutputStream(CollectionStream.empty("test-samza.Output"))
.run();
// Assertions on the outputs
StreamAssert.that("test-samza.Output").containsInAnyOrder({10,20,30,40});
|
}
});
}
@Override
public void close() throws Exception {
client.close();
}
}
// Initialize and run the test framework
TestRunner
.of(new AsyncRestTask())
.setTaskCallBackTimeoutMS(200)
.setTaskMaxConcurrency(4)
.addInputStream(CollectionStream.of("test-samza.Input", {1,2,3,4}))
.addOutputStream(CollectionStream.empty("test-samza.Output"))
.run();
// Assertions on the outputs
StreamAssert.that("test-samza.Output").containsInAnyOrder({10,20,30,40});
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Simple Test case using a collection as an input for a High level application
*/
public class MyStreamApplication implements StreamApplication {
@Override
public void init(StreamGraph graph, Config config) {
MessageStream<Integer> pageViews = graph.getInputStream(“test-samza.page-views”);
pageViews.map(s -> "processed " + s)
.sendTo(graph.getOutputStream(“test-samza.Output”));
}
}
| ||||||
Code Block | ||||||
| ||||||
/** * Simple Test case using a collection as an input for a High level application */ // Initialize and run the test framework TestApplicationTestRunner .createof(new MyStreamApplication()); .getInputStreamaddInputStream(CollectionStream.of("test-samza.input", {1,2,3})) .addOutputStream(CollectionStream.mapempty(s -> "processed " + s"test-samza.Output")) .run(); // Assertions on the outputs StreamAssert.that("test-samza.input").contains(Arrays.asList("processed 1", "processed 2", "processed 4")); |
...