THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestRunner provides static factory for quick setup of Samza environment for testing Low level api and High level api , users can configure input streams * they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask/AsyncStreamTask/StreamApplication */ public class TestRunner { // Static Factory to config & create runner for low level api public static TestRunner of(StreamTaskClass tasktaskClass) {...} // Static public static TestRunner of(AsyncStreamTask task) {...}Factory to config & create runner for high level api public static TestRunner of(StreamApplication app) {...} // Add/Ovveride any custom configs public TestRunner addOverrideConfigsaddConfigs(Map<String,String> configs) {...} public TestRunner addOverrideConfigsaddConfigs(StringConfig configUriconfigs) {...} public TestRunner addOverrideConfig(String key, String val){...} // SetConfigure containerstate mode either single container or multi containerfor application public TestRunner setContainerModeaddState(ModeString modestoreName) {...} // Multithreading configsConfigure an input stream for users samza system, that job can consume from public TestRunner setTaskMaxConcurrencyaddInputStream(IntegerCollectionStream valuestream) {...} // Configures an output stream for samza system, publicthat TestRunnerjob setTaskCallBackTimeoutMS(Integer value) {...}can producer to public TestRunner setJobContainerThreadPoolSizeaddOutputStream(IntegerCollectionStream valuestream) {...} // ConfigureRun statethe for applicationapp public TestRunnervoid addStaterun(String storeName) {...} // Configure an input stream for samza system, that job can consume from public TestRunner addInputStream(CollectionStream stream) {...} } |
Types of Input Streams
CollectionStream:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * CollectionStream acts a descriptor that can be used to build an in memory input stream (single/multiple partition) of collections(list, map). */ public class CollectionStream<T> { // ConfiguresCreate an outputempty stream forthat samza system, thata Samza job can producerproduce to public TestRunner addOutputStream(CollectionStream streamstatic <T> CollectionStream<T> empty(String streamName) {...} // Run the app Create a stream of messages from input list with single partition public static <T> voidCollectionStream<T> run() of(String streamName, Iterable<T> collection){...} } |
Types of Input Streams
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * CollectionStream provides utilities to build an in memory input stream of collections(list, map). It also supports initialization of * multiple partitions for an input stream */ public class CollectionStream<T> { // Create an empty stream that a Samza job can produce to public static <T> CollectionStream<T> empty(String steamId // Create a stream of messages from input list with multiple partition, key of partitions map is partitionId public static <T> CollectionStream<T> of(String streamName, Map<Integer,? extends Iterable<T>> partitions){...} // public CollectionStream<T> from(CollectionStreamSystem system) {...} // Create a stream of messages from input list with single partition public static <T> CollectionStream<T> of(String steamId, Iterable<T> collection) } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * CollectionStreamSystem provides utilities to create and initialize an in memory input stream. */ public class CollectionStreamSystem { // Create a CollectionStreamSystem public static CollectionStreamSystem create(String name) {...} // Create apublic stream<T> ofCollectionStreamSystem messages from input list with multiple partition, key of partitions map is partitionId public static <T> CollectionStream<T> of(String steamIdaddInput(String streamName, Map<Integer, Iterable<T>> partitions) {...} public <T> CollectionStreamSystem addOutput(String streamName, Map<Integer, Iterable<T>> partitions) {...} } |
EventStream:
Code Block | ||||||
---|---|---|---|---|---|---|
Code Block | ||||||
| ||||||
/** * EventStream provides utilities to build an in memory input stream of events. It helps mimic run time environment of your job, * advancing time for windowing functions */ public class EventStream<T> { public static abstract class Builder<T> { public abstract Builder addElement(); public abstract Builder addException(); public abstract Builder advanceTimeTo(long time); public abstract EventStream<T> build(); } public static <T> Builder<T> builder() {...} } public class EventStreamSystem { public static EventStreamSystem create(String name) {...} } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * FileStream provides utilities to build a stream of messages from a file on disk */ public class FileStream<T> { public static <T> FileStream<T> of(String fileUri) {...} } |
...
public class FileStreamSystem {
public static FileStreamSystem create(String name) {...}
} |
Examples Usages of Test Api:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with * minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10 */ // Create a StreamTask StreamTaskMyStreamTestTask myTask = new StreamTask() { @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10)); } }; //CollectionStreamSystem Initializesystem and run the test framework TestRunner .of(myTask) .setJobContainerThreadPoolSize(4) .addInputStream(CollectionStream.of("test-samza.= CollectionStreamSystem.create("test"); CollectionStream<Integer> input = CollectionStream .of("input", {1,2,3,4}) .from(system); CollectionStream output = .addOutputStream(CollectionStream .empty("test-samza.output")) ) .from(system); TestRunner .of(MyStreamTestTask.class) .addInputStream(input) .addOutputStream(output) .run(); // Assertions on the outputs StreamAssertAssert.that("test-samza.output")assertThat(StreamUtils.getStreamState(output), IsIterableContainingInOrder.contains({10,20,30,20,40}))); |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application. Itin demonstratesthe set up and comparison of a test with * minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10 */ HashMap<String,String> configs = new HashMap<String,String>(); class MyStreamTaskFactory implements StreamTaskFactory { @Override public StreamTask createInstance() { return (envelope, collector, coordinator) -> { return envelope * 10;async mode */ // public class MyAsyncStreamTask implements AsyncStreamTask, InitableTask, ClosableTask { private Client client; private WebTarget target; @Override public void init(Config config, TaskContext taskContext) throws Exception { // Your initialization of web client code goes here client = ClientBuilder.newClient(); target = client.target("http://example.com/resource/").path("hello"); } @Override public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, }; TaskCoordinator coordinator, final TaskCallback callback) }{ } } StreamDescriptor.Input input = StreamDescriptor.<String, String>input("input") .withKeySerde(new StringSerde("UTF-8")) .withMsgSerde(new StringSerde("UTF-8")); StreamDescriptor.Output output = StreamDescriptor.<String, String>output("output") .withKeySerde(new StringSerde("UTF-8")) .withMsgSerde(new StringSerde("UTF-8")); // InMemorySystem maintains system and stream config InMemoryTestSystem testingSystem = InMemoryTestSystem.create("samza-test") .addInput(input, {1,2,3,4,5}) .addOutput(output); // JOB Specific Config configs.put(JobConfig.JOB_NAME(), JOB_NAME); // Default Single Container configs configs.putIfAbsent(JobConfig.PROCESSOR_ID(), "1"); configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.putIfAbsent(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); StreamTaskApplication app = StreamApplications.createStreamTaskApp(new MapConfig(configs), new MyStreamTaskFactory()); app.addInputs(Collections.singletonList(input)).addOutputs(Collections.singletonList(output)).run(); app.waitForFinish(); // Assertions on the outputs Assert.assertThat(testingSystem.getStreamState(output), IsIterableContainingInOrder.contains({10,20,30,40,50}))); | ||||||
Code Block | ||||||
| ||||||
/** * Simple Test case using a collection as an input for a low level application in the async mode */ // public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask { private Client client; private WebTarget target; @Override public void init(Config config, TaskContext taskContext) throws Exception { // Your initialization of web client code goes here client = ClientBuilder.newClient(); target = client.target("http://example.com/resource/").path("hello"); } @Override public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, final TaskCallback callback) { target.request().async().get(new InvocationCallback<Response>() { @Override public void completed(Response response) { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10)); callback.complete(); } @Override public void failed(Throwable throwable) { target.request().async().get(new InvocationCallback<Response>() { @Override public void completed(Response response) { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10)); callback.complete(); } @Override public void failed(Throwable throwable) { System.out.println("Invocation failed."); callback.failure(throwable); } }); } @Override public void close() throws Exception { client.close(); } } //CollectionStreamSystem Initializesystem and run the test framework TestRunner .of(new AsyncRestTask()) .setTaskCallBackTimeoutMS(200) .setTaskMaxConcurrency(4) .addInputStream(CollectionStream.of("test-samza.Input= CollectionStreamSystem.create("test"); CollectionStream<Integer> input = CollectionStream .of("input", {1,2,3,4})) .addOutputStreamfrom(system); CollectionStream output = CollectionStream .empty("test-samza.output")) output") .from(system); TestRunner .of(MyAsyncStreamTask.class) .addInputStream(input) .addOutputStream(output) .run(); // Assertions on the outputs StreamAssert.that("test-samza.output").containsInAnyOrder({10,20,30,40}); |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a High level application */ public class MyStreamApplication implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream<Integer> pageViews = graph.getInputStream(“test-samza.page-views”); pageViews.map(s -> "processed " + s) .sendTo(graph.getOutputStream(“test-samza.output”)); } } / public class MyStreamApplication implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream<Integer> pageViews = graph.getInputStream(“test-samza.page-views”); pageViews.map(s -> "processed " + s) .sendTo(graph.getOutputStream(“test-samza.output”)); } } CollectionStreamSystem system = CollectionStreamSystem.create("test"); CollectionStream<Integer> input = CollectionStream .of("input", {1,2,3,4}) .from(system); CollectionStream output = CollectionStream .empty("output") .from(system); // Initialize and run the test framework TestRunner .of(new MyStreamApplication()); .addInputStream(CollectionStream.of("test-samza.input", {1,2,3})) .addOutputStream(CollectionStream.empty("test-samza.output"))output) .addOverrideConfig("job.default.system", "test") .run(); // Assertions on the outputs StreamAssert.that("test-samza.Output"output).contains({"processed 1", "processed 2", "processed 4"}); |
...