THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestTask provides static factory for quick setup of Samza environment for testing low level api, users can configure input streams * they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask and AsyncStreamTask */ public class TestTask { // Static Factory to config & create runner for low level api // Mode is either SingleContainer or MultiContainer public static TestTask create(StreamTask task, Map<String, String> config, Mode mode) {...} public static TestTask create(AsyncStreamTask task, Map<String, String> config, Mode mode) {...} // MultithreadingAdd/Ovveride configsany forcustom usersconfigs public TestTask setTaskMaxConcurrencyaddOverrideConfigs(IntegerMap<String,String> valueconfig) {...} public TestTask setTaskCallBackTimeoutMS(Integer value) {...}// Set container mode either single container or multi container public TestTask setTaskMaxConcurrencysetContainerMode(IntegerMode valuemode) {...} // ConfigureMultithreading anconfigs input stream for samza system, that task can consume fromusers public TestTask addInputStreamsetTaskMaxConcurrency(CollectionStreamInteger streamvalue) {...} public TestTask // Configures an output stream for samza system, that task can producer tosetTaskCallBackTimeoutMS(Integer value) {...} public TestTask addOutputStreamsetTaskMaxConcurrency(CollectionStreamInteger streamvalue) {...} // Configure Runstate thefor appapplication public voidTestApplication runaddState(String storeName) {...} } | ||||||
Code Block | ||||||
| ||||||
/** * TestApplication provides static factory for quick setup of Samza environment for testing High Level Api, users can configure input * streams and then apply various operators on streams and run the application */ public class TestApplication<T> { // Configure an input stream for samza system, that task can consume from public TestTask addInputStream(CollectionStream stream) {...} // Configures an output stream for samza system, that task can producer to public TestTask addOutputStream(CollectionStream stream) {...} // Run the app public void run() {...} } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestApplication provides static factory for quick setup of Samza environment for testing High Level Api, users can configure input * streams and then apply various operators on streams and run the application */ public class TestApplication<T> { // Static factory to config & create runner for High level api public static TestApplication create(Map<String, String> config) {...} // Add/Ovveride any custom configs public TestTask addOverrideConfigs(Map<String,String> config) {...} // Set container mode either single container or multi container public TestTask setContainerMode(Mode mode) {...} // Multithreading configs for users public TestTask setTaskMaxConcurrency(Integer value Static factory to config & create runner for High level api public static TestApplication create(Map<String, String> config, Mode mode) {...} public TestTask // Configure state for applicationsetTaskCallBackTimeoutMS(Integer value) {...} public TestApplicationTestTask addStatesetTaskMaxConcurrency(Serde<?>Integer serdevalue) {...} // Configure any kind of input stream for samza system, and get the handle of message stream to apply operators on public <T> MessageStream<T> getInputStream(EventStream<T> stream) {...} public <T> MessageStream<T> getInputStream(CollectionStream<T> stream) {...} public <T> MessageStream<T> getInputStream(FileStream<T> stream) {...} // Run the app public void run() {...}; } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * CollectionStream provides utilities to build an in memory input stream of collections(list, map). It also supports initialization of * multiple partitions for an input stream */ public class CollectionStream<T> { // Create an empty stream that a Samza task can produce to public static <T> CollectionStream<T> empty(String steamId) {...} // Create a stream of messages from input list with single partition public static <T> CollectionStream<T> of(String steamId, Iterable<T> collection){...} // Create a stream of messages from input list with multiple partition, key of partitions map is partitionId public static <T> CollectionStream<T> of(String steamId, Map<Integer,Iterable> partitions){...} // Create a stream of messages from input list with multiple partition (REMOVE THIS ??????) of partitions map is partitionId public static <K, V> CollectionStream<KV<K, V>><T> CollectionStream<T> of(String steamId, Map<KMap<Integer,Iterable> V> elemspartitions) {...} } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * EventStream provides utilities to build an in memory input stream of events. It helps mimic run time environment of your job, * advancing * time for windowing functions */ public class EventStream<T> { public static abstract class Builder<T> { public abstract Builder addElement(); public abstract Builder addException(); public abstract Builder advanceTimeTo(long time); public abstract EventStream<T> build(); } public static <T> Builder<T> builder() {...} } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/
// Create a StreamTask
StreamTask myTask = new StreamTask() {
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
}
};
// Initialize and run the test framework
TestTask
.create(myTask, config, mode.SINGLE_CONTAINER)
.setJobContainerThreadPoolSize(4)
.addInputStream(CollectionStream.of("test-samza.input", {1,2,3,4}))
.addOutputStream(CollectionStream.empty("test-samza.output"))
.run();
// Assertions on the outputs
StreamAssert.that("test-samza.output").contains({10,30,20,40}); |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/
//
public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
private Client client;
private WebTarget target;
@Override
public void init(Config config, TaskContext taskContext) throws Exception {
// Your initialization of web client code goes here
client = ClientBuilder.newClient();
target = client.target("http://example.com/resource/").path("hello");
}
@Override
public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
TaskCoordinator coordinator, final TaskCallback callback) {
target.request().async().get(new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
callback.complete();
}
@Override
public void failed(Throwable throwable) {
System.out.println("Invocation failed.");
callback.failure(throwable);
}
});
}
@Override
public void close() throws Exception {
client.close();
}
}
// Initialize and run the test framework
TestTask
.create(new AsyncRestTask(), config, Mode.SINGLE_CONTAINER)
.setTaskCallBackTimeoutMS(200)
.setTaskMaxConcurrency(4)
.addInputStream(CollectionStream.of("test-samza.Input", {1,2,3,4}))
.addOutputStream(CollectionStream.empty("test-samza.Output"))
.run();
// Assertions on the outputs
StreamAssert.that("test-samza.Output").containsInAnyOrder({10,20,30,40});
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Simple Test case using a collection as an input for a High level application
*/
// Initialize and run the test framework
TestApplication
.create(config, mode.SINGLE_CONTAINER);
.getInputStream(CollectionStream.of("test-samza.input", {1,2,3}))
.map(s -> "processed " + s)
.run();
// Assertions on the outputs
StreamAssert.that("test-samza.input").contains(Arrays.asList("processed 1", "processed 2", "processed 4")); |
...