Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEclipse
titleTestRunner
/**
* TestRunner provides static factory for quick setup of Samza environment for testing Low level api and High level api , users can configure input streams 
* they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask/AsyncStreamTask/StreamApplication
*/ 

public class TestRunner {
  
  // Static Factory to config & create runner for low level api 
  public static TestRunner of(Class taskClass) {...}
  // Static Factory to config & create runner for high level api
  public static TestRunner of(StreamApplication app) {...}
  
  // Add/Ovveride any custom configs
  public TestRunner addConfigs(Map<String,String> configs) {...}
  public TestRunner addConfigs(Config configs) {...}
  public TestRunner addOverrideConfig(String key, String val){...}
 
  // Configure state for application
  public TestRunner addState(String storeName) {...}  
  
  // Configure an input stream for samza system, that job can consume from
  public TestRunner addInputStream(CollectionStream stream) {...}
  // Configures an output stream for samza system, that job can producer to
  public TestRunner addOutputStream(CollectionStream stream) {...}
  
  // Run the appTestRunner
  public void run() {...}
}

 

...

Code Block
languagejava
themeEclipse
titleCollectionStream
/**
* CollectionStream acts a descriptor that can be used to build an in memory input stream (single/multiple partition) of collections(list, map).java collections
*/
 
public class CollectionStream<T> { 
  // Create an empty stream with single partition that a Samza job can produce to
  public static <T> CollectionStream<T> empty(String streamName) {...}
  
  // Create an empty stream with multiple partitions that a Samza job can produce to
  public static <T> CollectionStream<T> empty(String streamName, Integer partitionCount) {...}
  
  // Create a stream of messages from input list with single partition
  public static <T> CollectionStream<T> of(String streamName, Iterable<T> collection){...}
  
  // Create a stream of messages from input list with multiple partition, key of partitions map is partitionId
  public static <T> CollectionStream<T> of(String streamName, Map<Integer,? extends Iterable<T>> partitions){...}
  
  // Associate this CollectionStream with a CollectionStreamSystem
  public CollectionStream<T> from(CollectionStreamSystem system) {...}  
}
Code Block
languagejava
themeEclipse
titleCollectionStreamSystem
/**
* CollectionStreamSystem provides utilities to create and initialize an in memory input stream.
*/
 
public class CollectionStreamSystem { 

 // Create a CollectionStreamSystem 
 public static CollectionStreamSystem create(String name) {...}  
 // Create an In memory stream and initialize it with messages from partitions map
 public <T> CollectionStreamSystem addInput(String streamName, Map<Integer, Iterable<T>> partitions) {...}
 // Create an empty in memory stream with as many partitions as partitionCount
 public <T> CollectionStreamSystem addOutput(String streamName, Map<Integer, Iterable<T>>Integer partitionspartitionCount) {...}
 
}

 

 

EventStream:

Code Block
languagejava
themeEclipse
titleEventStream
/**
* EventStream provides utilities to build an in memory input stream of events. It helps mimic run time environment of your job, 
* advancing time for windowing functions  
*/
 
public class EventStream<T> {
  public static abstract class Builder<T> {
    public abstract Builder addElement();
    public abstract Builder addException();
    public abstract Builder advanceTimeTo(long time);
    public abstract EventStream<T> build(); 
 }

 public static <T> Builder<T> builder() {...}
}
 
public class EventStreamSystem {
  public static EventStreamSystem create(String name) {...}
}

...