Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Traditionally we ask users to set up config for any samza job, for test purposes we set up basic config boiler plate for users and provide them a flexible option to still add any custom config (rarely needed), api exposes functions to configure single container or multi container mode. It also provides apis to configure concurrency semantics for the job. 

Public Interfaces

Two Apis for writing tests are: Low Level Test Api (TestTask) & High Level Api (TestApplication)

 

Code Block
languagejava
themeEclipse
titleTestTask
public class TestTask {
  private TestTask(String systemName, StreamTask task, HashMap<String, String> config) {...}
  private TestTask(String systemName, AsyncStreamTask task, HashMap<String, String> config) {...}
  
  // Static Factory to config & create runner for low level api 
  public static TestTask create(String systemName, StreamTask task, HashMap<String, String> config) {...}
  public static TestTask create(String systemName, AsyncStreamTask task, HashMap<String, String> config) {...}
  
  // Multithreading configs for users
  public TestTask setTaskMaxConcurrency(Integer value) {...}
  public TestTask setTaskCallBackTimeoutMS(Integer value) {...}
  public TestTask setTaskMaxConcurrency(Integer value) {...}
 
  // Configure an input stream for samza system, that task can consume from
  public TestTask addInputStream(CollectionStream stream) {...}
  
  // Configures an output stream for samza system, that task can producer to
  public TestTask addOutputStream(CollectionStream stream) {...}
  
  // Run the app
  public void run() {...}
}

 
Code Block
languagejava
themeEclipse
titleFileStream
public class TestApplication<T> {
  
  // Configure an input stream for samza system, that app can use high order functions on
  public <T> MessageStream<T> getInputStream(EventStream<T> stream) {return null;}
  public <T> MessageStream<T> getInputStream(CollectionStream<T> stream) {return null;}
  public <T> MessageStream<T> getInputStream(FileStream<T> stream) {return null;}
  
  // Run the app
  public void run() { };
}

Types of Input Streams

 

Code Block
languagejava
themeEclipse
titleCollectionStream
/**
* CollectionStream provides utilities to build a in memory input stream of collections(list, map). It also supports initialization of 
* partitions  
*/
 
public class CollectionStream<T> {
  private CollectionStream(String systemStream, List<T> collection, Integer partitionCount) {...}
  private CollectionStream(String systemStream) {...}
  
  public static <T> CollectionStream<T> empty(String systemStream) {
    return new CollectionStream<>(systemStream);
  }
  
  public static <T> CollectionStream<T> of(String systemStream, List<T> collection){
    return new CollectionStream<>(systemStream, collection, 1);
  }
  
  public static <T> CollectionStream<T> ofPartitions(String systemStream, List<List<T>> collection){
    return new CollectionStream<>(systemStream, collection, collection.size());
  }
  
  public static <K, V> CollectionStream<KV<K, V>> of(String systemStream, Map<K, V> elems) {...}
}

...

Code Block
languagejava
themeEclipse
titleFileStream
public class FileStream<T> {
  public static <T> FileStream<T> of(String fileUri) {...}
}

...


A Sample Test:


Code Block
languagejava
themeEclipse
titleSimple StreamTask Test
linenumberstrue
/**
* Sample Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with 
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies 
* each integer with 10
*/
 
// Create a StreamTask
StreamTask myTask = new StreamTask() {
  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
    Integer obj = (Integer) envelope.getMessage();
    collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
  }
};
 
// Initialize and run the test framework
TestTask
    .create(systemName: "test-samza", myTask, config)
    .setJobContainerThreadPoolSize(4)
    .addInputStream(CollectionStream.of(streamName: "input", {1,2,3,4}))
    .addOutputStream(CollectionStream.empty(streamName: "output"))
    .run();
 
// Assertions on the outputs
TaskAssert.that("test-samza", "Output").containsInAnyOrder({10,30,20,40});

...