THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestRunner provides static factory for quick setup of Samza environment for testing Low level api and High level api , users can configure input streams * they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask/AsyncStreamTask/StreamApplication */ public class TestRunner { // Static Factory to config & create runner for low level api public static TestRunner of(Class taskClass) {...} // Static Factory to config & create runner for high level api public static TestRunner of(StreamApplication app) {...} // Add/Ovveride any custom configs public TestRunner addConfigs(Map<String,String> configs) {...} public TestRunner addConfigs(Config configs) {...} public TestRunner addOverrideConfig(String key, String val){...} // Configure state for application public TestRunner addState(String storeName) {...} // Configure an input stream for samza system, that job can consume from public TestRunner addInputStream(CollectionStream stream) {...} // Configures an output stream for samza system, that job can producer to public TestRunner addOutputStream(CollectionStream stream) {...} // Run the appTestRunner public void run() {...} } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * CollectionStream acts a descriptor that can be used to build an in memory input stream (single/multiple partition) of collections(list, map).java collections */ public class CollectionStream<T> { // Create an empty stream with single partition that a Samza job can produce to public static <T> CollectionStream<T> empty(String streamName) {...} // Create an empty stream with multiple partitions that a Samza job can produce to public static <T> CollectionStream<T> empty(String streamName, Integer partitionCount) {...} // Create a stream of messages from input list with single partition public static <T> CollectionStream<T> of(String streamName, Iterable<T> collection){...} // Create a stream of messages from input list with multiple partition, key of partitions map is partitionId public static <T> CollectionStream<T> of(String streamName, Map<Integer,? extends Iterable<T>> partitions){...} // Associate this CollectionStream with a CollectionStreamSystem public CollectionStream<T> from(CollectionStreamSystem system) {...} } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * CollectionStreamSystem provides utilities to create and initialize an in memory input stream. */ public class CollectionStreamSystem { // Create a CollectionStreamSystem public static CollectionStreamSystem create(String name) {...} // Create an In memory stream and initialize it with messages from partitions map public <T> CollectionStreamSystem addInput(String streamName, Map<Integer, Iterable<T>> partitions) {...} // Create an empty in memory stream with as many partitions as partitionCount public <T> CollectionStreamSystem addOutput(String streamName, Map<Integer, Iterable<T>>Integer partitionspartitionCount) {...} } |
EventStream:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * EventStream provides utilities to build an in memory input stream of events. It helps mimic run time environment of your job, * advancing time for windowing functions */ public class EventStream<T> { public static abstract class Builder<T> { public abstract Builder addElement(); public abstract Builder addException(); public abstract Builder advanceTimeTo(long time); public abstract EventStream<T> build(); } public static <T> Builder<T> builder() {...} } public class EventStreamSystem { public static EventStreamSystem create(String name) {...} } |
...